summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:41:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:41:41 +0000
commit10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87 (patch)
treebdffd5d80c26cf4a7a518281a204be1ace85b4c1 /vendor/gix-features/src
parentReleasing progress-linux version 1.70.0+dfsg1-9~progress7.99u1. (diff)
downloadrustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.tar.xz
rustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.zip
Merging upstream version 1.70.0+dfsg2.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src')
-rw-r--r--vendor/gix-features/src/cache.rs76
-rw-r--r--vendor/gix-features/src/decode.rs38
-rw-r--r--vendor/gix-features/src/fs.rs246
-rw-r--r--vendor/gix-features/src/hash.rs190
-rw-r--r--vendor/gix-features/src/interrupt.rs125
-rw-r--r--vendor/gix-features/src/io.rs94
-rw-r--r--vendor/gix-features/src/lib.rs64
-rw-r--r--vendor/gix-features/src/parallel/eager_iter.rs124
-rw-r--r--vendor/gix-features/src/parallel/in_order.rs83
-rw-r--r--vendor/gix-features/src/parallel/in_parallel.rs211
-rw-r--r--vendor/gix-features/src/parallel/mod.rs179
-rw-r--r--vendor/gix-features/src/parallel/reduce.rs279
-rw-r--r--vendor/gix-features/src/parallel/serial.rs135
-rw-r--r--vendor/gix-features/src/progress.rs139
-rw-r--r--vendor/gix-features/src/threading.rs100
-rw-r--r--vendor/gix-features/src/zlib/mod.rs47
-rw-r--r--vendor/gix-features/src/zlib/stream/deflate/mod.rs96
-rw-r--r--vendor/gix-features/src/zlib/stream/deflate/tests.rs101
-rw-r--r--vendor/gix-features/src/zlib/stream/inflate.rs57
-rw-r--r--vendor/gix-features/src/zlib/stream/mod.rs4
20 files changed, 2388 insertions, 0 deletions
diff --git a/vendor/gix-features/src/cache.rs b/vendor/gix-features/src/cache.rs
new file mode 100644
index 000000000..f7a2cf005
--- /dev/null
+++ b/vendor/gix-features/src/cache.rs
@@ -0,0 +1,76 @@
+#[cfg(feature = "cache-efficiency-debug")]
+mod impl_ {
+ /// A helper to collect useful information about cache efficiency.
+ pub struct Debug {
+ owner: String,
+ hits: usize,
+ puts: usize,
+ misses: usize,
+ }
+
+ impl Debug {
+ /// Create a new instance
+ #[inline]
+ pub fn new(owner: impl Into<String>) -> Self {
+ Debug {
+ owner: owner.into(),
+ hits: 0,
+ puts: 0,
+ misses: 0,
+ }
+ }
+ /// Count cache insertions
+ #[inline]
+ pub fn put(&mut self) {
+ self.puts += 1;
+ }
+ /// Count hits
+ #[inline]
+ pub fn hit(&mut self) {
+ self.hits += 1;
+ }
+ /// Count misses
+ #[inline]
+ pub fn miss(&mut self) {
+ self.misses += 1;
+ }
+ }
+
+ impl Drop for Debug {
+ fn drop(&mut self) {
+ let hits = self.hits;
+ let misses = self.misses;
+ let ratio = hits as f32 / misses as f32;
+ eprintln!(
+ "{}[{:0x}]: {} / {} (hits/misses) = {:.02}%, puts = {}",
+ self.owner,
+ self as *const _ as usize,
+ hits,
+ misses,
+ ratio * 100.0,
+ self.puts
+ );
+ }
+ }
+}
+#[cfg(not(feature = "cache-efficiency-debug"))]
+mod impl_ {
+ /// The disabled, zero size do-nothing equivalent
+ pub struct Debug;
+
+ impl Debug {
+ /// Create a new instance
+ #[inline]
+ pub fn new(_owner: impl Into<String>) -> Self {
+ Debug
+ }
+ /// noop
+ pub fn put(&mut self) {}
+ /// noop
+ pub fn hit(&mut self) {}
+ /// noop
+ pub fn miss(&mut self) {}
+ }
+}
+
+pub use impl_::Debug;
diff --git a/vendor/gix-features/src/decode.rs b/vendor/gix-features/src/decode.rs
new file mode 100644
index 000000000..0df38710d
--- /dev/null
+++ b/vendor/gix-features/src/decode.rs
@@ -0,0 +1,38 @@
+use std::io::Read;
+
+/// Decode variable int numbers from a `Read` implementation.
+///
+/// Note: currently overflow checks are only done in debug mode.
+#[inline]
+pub fn leb64_from_read(mut r: impl Read) -> Result<(u64, usize), std::io::Error> {
+ let mut b = [0u8; 1];
+ let mut i = 0;
+ r.read_exact(&mut b)?;
+ i += 1;
+ let mut value = b[0] as u64 & 0x7f;
+ while b[0] & 0x80 != 0 {
+ r.read_exact(&mut b)?;
+ i += 1;
+ debug_assert!(i <= 10, "Would overflow value at 11th iteration");
+ value += 1;
+ value = (value << 7) + (b[0] as u64 & 0x7f)
+ }
+ Ok((value, i))
+}
+
+/// Decode variable int numbers.
+#[inline]
+pub fn leb64(d: &[u8]) -> (u64, usize) {
+ let mut i = 0;
+ let mut c = d[i];
+ i += 1;
+ let mut value = c as u64 & 0x7f;
+ while c & 0x80 != 0 {
+ c = d[i];
+ i += 1;
+ debug_assert!(i <= 10, "Would overflow value at 11th iteration");
+ value += 1;
+ value = (value << 7) + (c as u64 & 0x7f)
+ }
+ (value, i)
+}
diff --git a/vendor/gix-features/src/fs.rs b/vendor/gix-features/src/fs.rs
new file mode 100644
index 000000000..f65779b92
--- /dev/null
+++ b/vendor/gix-features/src/fs.rs
@@ -0,0 +1,246 @@
+//! Filesystem utilities
+//!
+//! These are will be parallel if the `parallel` feature is enabled, at the expense of compiling additional dependencies
+//! along with runtime costs for maintaining a global [`rayon`](https://docs.rs/rayon) thread pool.
+//!
+//! For information on how to use the [`WalkDir`] type, have a look at
+//! * [`jwalk::WalkDir`](https://docs.rs/jwalk/0.5.1/jwalk/type.WalkDir.html) if `parallel` feature is enabled
+//! * [walkdir::WalkDir](https://docs.rs/walkdir/2.3.1/walkdir/struct.WalkDir.html) otherwise
+
+#[cfg(any(feature = "walkdir", feature = "fs-walkdir-parallel"))]
+mod shared {
+ /// The desired level of parallelism.
+ pub enum Parallelism {
+ /// Do not parallelize at all by making a serial traversal on the current thread.
+ Serial,
+ /// Create a new thread pool for each traversal with up to 16 threads or the amount of logical cores of the machine.
+ ThreadPoolPerTraversal {
+ /// The base name of the threads we create as part of the thread-pool.
+ thread_name: &'static str,
+ },
+ }
+}
+
+///
+#[cfg(feature = "fs-walkdir-parallel")]
+pub mod walkdir {
+ use std::path::Path;
+
+ pub use jwalk::{DirEntry as DirEntryGeneric, DirEntryIter as DirEntryIterGeneric, Error, WalkDir};
+
+ pub use super::shared::Parallelism;
+
+ /// An alias for an uncustomized directory entry to match the one of the non-parallel version offered by `walkdir`.
+ pub type DirEntry = DirEntryGeneric<((), ())>;
+
+ impl From<Parallelism> for jwalk::Parallelism {
+ fn from(v: Parallelism) -> Self {
+ match v {
+ Parallelism::Serial => jwalk::Parallelism::Serial,
+ Parallelism::ThreadPoolPerTraversal { thread_name } => std::thread::available_parallelism()
+ .map(|threads| {
+ let pool = jwalk::rayon::ThreadPoolBuilder::new()
+ .num_threads(threads.get().min(16))
+ .stack_size(128 * 1024)
+ .thread_name(move |idx| format!("{thread_name} {idx}"))
+ .build()
+ .expect("we only set options that can't cause a build failure");
+ jwalk::Parallelism::RayonExistingPool {
+ pool: pool.into(),
+ busy_timeout: None,
+ }
+ })
+ .unwrap_or_else(|_| Parallelism::Serial.into()),
+ }
+ }
+ }
+
+ /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`.
+ pub fn walkdir_new(root: impl AsRef<Path>, parallelism: Parallelism) -> WalkDir {
+ WalkDir::new(root).skip_hidden(false).parallelism(parallelism.into())
+ }
+
+ /// Instantiate a new directory iterator which will not skip hidden files and is sorted
+ pub fn walkdir_sorted_new(root: impl AsRef<Path>, parallelism: Parallelism) -> WalkDir {
+ WalkDir::new(root)
+ .skip_hidden(false)
+ .sort(true)
+ .parallelism(parallelism.into())
+ }
+
+ /// The Iterator yielding directory items
+ pub type DirEntryIter = DirEntryIterGeneric<((), ())>;
+}
+
+#[cfg(all(feature = "walkdir", not(feature = "fs-walkdir-parallel")))]
+///
+pub mod walkdir {
+ use std::path::Path;
+
+ pub use walkdir::{DirEntry, Error, WalkDir};
+
+ pub use super::shared::Parallelism;
+
+ /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`.
+ pub fn walkdir_new(root: impl AsRef<Path>, _: Parallelism) -> WalkDir {
+ WalkDir::new(root)
+ }
+
+ /// Instantiate a new directory iterator which will not skip hidden files and is sorted, with the given level of `parallelism`.
+ pub fn walkdir_sorted_new(root: impl AsRef<Path>, _: Parallelism) -> WalkDir {
+ WalkDir::new(root).sort_by_file_name()
+ }
+
+ /// The Iterator yielding directory items
+ pub type DirEntryIter = walkdir::IntoIter;
+}
+
+#[cfg(any(feature = "walkdir", feature = "fs-walkdir-parallel"))]
+pub use self::walkdir::{walkdir_new, walkdir_sorted_new, WalkDir};
+
+/// Prepare open options which won't follow symlinks when the file is opened.
+///
+/// Note: only effective on unix currently.
+pub fn open_options_no_follow() -> std::fs::OpenOptions {
+ #[cfg_attr(not(unix), allow(unused_mut))]
+ let mut options = std::fs::OpenOptions::new();
+ #[cfg(unix)]
+ {
+ /// Make sure that it's impossible to follow through to the target of symlinks.
+ /// Note that this will still follow symlinks in the path, which is what we assume
+ /// has been checked separately.
+ use std::os::unix::fs::OpenOptionsExt;
+ options.custom_flags(libc::O_NOFOLLOW);
+ }
+ options
+}
+
+mod snapshot {
+ use std::ops::Deref;
+
+ use crate::threading::{get_mut, get_ref, MutableOnDemand, OwnShared};
+
+ /// A structure holding enough information to reload a value if its on-disk representation changes as determined by its modified time.
+ #[derive(Debug)]
+ pub struct Snapshot<T: std::fmt::Debug> {
+ value: T,
+ modified: std::time::SystemTime,
+ }
+
+ impl<T: Clone + std::fmt::Debug> Clone for Snapshot<T> {
+ fn clone(&self) -> Self {
+ Self {
+ value: self.value.clone(),
+ modified: self.modified,
+ }
+ }
+ }
+
+ /// A snapshot of a resource which is up-to-date in the moment it is retrieved.
+ pub type SharedSnapshot<T> = OwnShared<Snapshot<T>>;
+
+ /// Use this type for fields in structs that are to store the [`Snapshot`], typically behind an [`OwnShared`].
+ ///
+ /// Note that the resource itself is behind another [`OwnShared`] to allow it to be used without holding any kind of lock, hence
+ /// without blocking updates while it is used.
+ #[derive(Debug, Default)]
+ pub struct MutableSnapshot<T: std::fmt::Debug>(pub MutableOnDemand<Option<SharedSnapshot<T>>>);
+
+ impl<T: std::fmt::Debug> Deref for Snapshot<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ &self.value
+ }
+ }
+
+ impl<T: std::fmt::Debug> Deref for MutableSnapshot<T> {
+ type Target = MutableOnDemand<Option<SharedSnapshot<T>>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+ }
+
+ impl<T: std::fmt::Debug> MutableSnapshot<T> {
+ /// Create a new instance of this type.
+ ///
+ /// Useful in case `Default::default()` isn't working for some reason.
+ pub fn new() -> Self {
+ MutableSnapshot(MutableOnDemand::new(None))
+ }
+
+ /// Refresh `state` forcefully by re-`open`ing the resource. Note that `open()` returns `None` if the resource isn't
+ /// present on disk, and that it's critical that the modified time is obtained _before_ opening the resource.
+ pub fn force_refresh<E>(
+ &self,
+ open: impl FnOnce() -> Result<Option<(std::time::SystemTime, T)>, E>,
+ ) -> Result<(), E> {
+ let mut state = get_mut(&self.0);
+ *state = open()?.map(|(modified, value)| OwnShared::new(Snapshot { value, modified }));
+ Ok(())
+ }
+
+ /// Assure that the resource in `state` is up-to-date by comparing the `current_modification_time` with the one we know in `state`
+ /// and by acting accordingly.
+ /// Returns the potentially updated/reloaded resource if it is still present on disk, which then represents a snapshot that is up-to-date
+ /// in that very moment, or `None` if the underlying file doesn't exist.
+ ///
+ /// Note that even though this is racy, each time a request is made there is a chance to see the actual state.
+ pub fn recent_snapshot<E>(
+ &self,
+ mut current_modification_time: impl FnMut() -> Option<std::time::SystemTime>,
+ open: impl FnOnce() -> Result<Option<T>, E>,
+ ) -> Result<Option<SharedSnapshot<T>>, E> {
+ let state = get_ref(self);
+ let recent_modification = current_modification_time();
+ let buffer = match (&*state, recent_modification) {
+ (None, None) => (*state).clone(),
+ (Some(_), None) => {
+ drop(state);
+ let mut state = get_mut(self);
+ *state = None;
+ (*state).clone()
+ }
+ (Some(snapshot), Some(modified_time)) => {
+ if snapshot.modified < modified_time {
+ drop(state);
+ let mut state = get_mut(self);
+
+ if let (Some(_snapshot), Some(modified_time)) = (&*state, current_modification_time()) {
+ *state = open()?.map(|value| {
+ OwnShared::new(Snapshot {
+ value,
+ modified: modified_time,
+ })
+ });
+ }
+
+ (*state).clone()
+ } else {
+ // Note that this relies on sub-section precision or else is a race when the packed file was just changed.
+ // It's nothing we can know though, so… up to the caller unfortunately.
+ Some(snapshot.clone())
+ }
+ }
+ (None, Some(_modified_time)) => {
+ drop(state);
+ let mut state = get_mut(self);
+ // Still in the same situation? If so, load the buffer. This compensates for the trampling herd
+ // during lazy-loading at the expense of another mtime check.
+ if let (None, Some(modified_time)) = (&*state, current_modification_time()) {
+ *state = open()?.map(|value| {
+ OwnShared::new(Snapshot {
+ value,
+ modified: modified_time,
+ })
+ });
+ }
+ (*state).clone()
+ }
+ };
+ Ok(buffer)
+ }
+ }
+}
+pub use snapshot::{MutableSnapshot, SharedSnapshot, Snapshot};
diff --git a/vendor/gix-features/src/hash.rs b/vendor/gix-features/src/hash.rs
new file mode 100644
index 000000000..fe064139a
--- /dev/null
+++ b/vendor/gix-features/src/hash.rs
@@ -0,0 +1,190 @@
+//! Hash functions and hash utilities
+//!
+//! With the `fast-sha1` feature, the `Sha1` hash type will use a more elaborate implementation utilizing hardware support
+//! in case it is available. Otherwise the `rustsha1` feature should be set. `fast-sha1` will take precedence.
+//! Otherwise, a minimal yet performant implementation is used instead for a decent trade-off between compile times and run-time performance.
+#[cfg(all(feature = "rustsha1", not(feature = "fast-sha1")))]
+mod _impl {
+ use super::Sha1Digest;
+
+ /// A implementation of the Sha1 hash, which can be used once.
+ #[derive(Default, Clone)]
+ pub struct Sha1(sha1_smol::Sha1);
+
+ impl Sha1 {
+ /// Digest the given `bytes`.
+ pub fn update(&mut self, bytes: &[u8]) {
+ self.0.update(bytes)
+ }
+ /// Finalize the hash and produce a digest.
+ pub fn digest(self) -> Sha1Digest {
+ self.0.digest().bytes()
+ }
+ }
+}
+
+/// A 20 bytes digest produced by a [`Sha1`] hash implementation.
+#[cfg(any(feature = "fast-sha1", feature = "rustsha1"))]
+pub type Sha1Digest = [u8; 20];
+
+#[cfg(feature = "fast-sha1")]
+mod _impl {
+ use sha1::Digest;
+
+ use super::Sha1Digest;
+
+ /// A implementation of the Sha1 hash, which can be used once.
+ #[derive(Default, Clone)]
+ pub struct Sha1(sha1::Sha1);
+
+ impl Sha1 {
+ /// Digest the given `bytes`.
+ pub fn update(&mut self, bytes: &[u8]) {
+ self.0.update(bytes)
+ }
+ /// Finalize the hash and produce a digest.
+ pub fn digest(self) -> Sha1Digest {
+ self.0.finalize().into()
+ }
+ }
+}
+
+#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))]
+pub use _impl::Sha1;
+
+/// Compute a CRC32 hash from the given `bytes`, returning the CRC32 hash.
+///
+/// When calling this function for the first time, `previous_value` should be `0`. Otherwise it
+/// should be the previous return value of this function to provide a hash of multiple sequential
+/// chunks of `bytes`.
+#[cfg(feature = "crc32")]
+pub fn crc32_update(previous_value: u32, bytes: &[u8]) -> u32 {
+ let mut h = crc32fast::Hasher::new_with_initial(previous_value);
+ h.update(bytes);
+ h.finalize()
+}
+
+/// Compute a CRC32 value of the given input `bytes`.
+///
+/// In case multiple chunks of `bytes` are present, one should use [`crc32_update()`] instead.
+#[cfg(feature = "crc32")]
+pub fn crc32(bytes: &[u8]) -> u32 {
+ let mut h = crc32fast::Hasher::new();
+ h.update(bytes);
+ h.finalize()
+}
+
+/// Produce a hasher suitable for the given kind of hash.
+#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))]
+pub fn hasher(kind: gix_hash::Kind) -> Sha1 {
+ match kind {
+ gix_hash::Kind::Sha1 => Sha1::default(),
+ }
+}
+
+/// Compute the hash of `kind` for the bytes in the file at `path`, hashing only the first `num_bytes_from_start`
+/// while initializing and calling `progress`.
+///
+/// `num_bytes_from_start` is useful to avoid reading trailing hashes, which are never part of the hash itself,
+/// denoting the amount of bytes to hash starting from the beginning of the file.
+///
+/// # Note
+///
+/// * Only available with the `gix-object` feature enabled due to usage of the [`gix_hash::Kind`] enum and the
+/// [`gix_hash::ObjectId`] return value.
+/// * [Interrupts][crate::interrupt] are supported.
+#[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))]
+pub fn bytes_of_file(
+ path: impl AsRef<std::path::Path>,
+ num_bytes_from_start: usize,
+ kind: gix_hash::Kind,
+ progress: &mut impl crate::progress::Progress,
+ should_interrupt: &std::sync::atomic::AtomicBool,
+) -> std::io::Result<gix_hash::ObjectId> {
+ bytes(
+ std::fs::File::open(path)?,
+ num_bytes_from_start,
+ kind,
+ progress,
+ should_interrupt,
+ )
+}
+
+/// Similar to [`bytes_of_file`], but operates on an already open file.
+#[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))]
+pub fn bytes(
+ mut read: impl std::io::Read,
+ num_bytes_from_start: usize,
+ kind: gix_hash::Kind,
+ progress: &mut impl crate::progress::Progress,
+ should_interrupt: &std::sync::atomic::AtomicBool,
+) -> std::io::Result<gix_hash::ObjectId> {
+ let mut hasher = hasher(kind);
+ let start = std::time::Instant::now();
+ // init progress before the possibility for failure, as convenience in case people want to recover
+ progress.init(Some(num_bytes_from_start), crate::progress::bytes());
+
+ const BUF_SIZE: usize = u16::MAX as usize;
+ let mut buf = [0u8; BUF_SIZE];
+ let mut bytes_left = num_bytes_from_start;
+
+ while bytes_left > 0 {
+ let out = &mut buf[..BUF_SIZE.min(bytes_left)];
+ read.read_exact(out)?;
+ bytes_left -= out.len();
+ progress.inc_by(out.len());
+ hasher.update(out);
+ if should_interrupt.load(std::sync::atomic::Ordering::SeqCst) {
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, "Interrupted"));
+ }
+ }
+
+ let id = gix_hash::ObjectId::from(hasher.digest());
+ progress.show_throughput(start);
+ Ok(id)
+}
+
+#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))]
+mod write {
+ use crate::hash::Sha1;
+
+ /// A utility to automatically generate a hash while writing into an inner writer.
+ pub struct Write<T> {
+ /// The hash implementation.
+ pub hash: Sha1,
+ /// The inner writer.
+ pub inner: T,
+ }
+
+ impl<T> std::io::Write for Write<T>
+ where
+ T: std::io::Write,
+ {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let written = self.inner.write(buf)?;
+ self.hash.update(&buf[..written]);
+ Ok(written)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.inner.flush()
+ }
+ }
+
+ impl<T> Write<T>
+ where
+ T: std::io::Write,
+ {
+ /// Create a new hash writer which hashes all bytes written to `inner` with a hash of `kind`.
+ pub fn new(inner: T, object_hash: gix_hash::Kind) -> Self {
+ match object_hash {
+ gix_hash::Kind::Sha1 => Write {
+ inner,
+ hash: Sha1::default(),
+ },
+ }
+ }
+ }
+}
+#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))]
+pub use write::Write;
diff --git a/vendor/gix-features/src/interrupt.rs b/vendor/gix-features/src/interrupt.rs
new file mode 100644
index 000000000..1f78e613a
--- /dev/null
+++ b/vendor/gix-features/src/interrupt.rs
@@ -0,0 +1,125 @@
+//! Utilities to cause interruptions in common traits, like Read/Write and Iterator.
+use std::{
+ io,
+ sync::atomic::{AtomicBool, Ordering},
+};
+
+/// A wrapper for an inner iterator which will check for interruptions on each iteration, stopping the iteration when
+/// that is requested.
+pub struct Iter<'a, I> {
+ /// The actual iterator to yield elements from.
+ pub inner: I,
+ should_interrupt: &'a AtomicBool,
+}
+
+impl<'a, I> Iter<'a, I>
+where
+ I: Iterator,
+{
+ /// Create a new iterator over `inner` which checks for interruptions on each iteration on `should_interrupt`.
+ ///
+ /// Note that this means the consumer of the iterator data should also be able to access `should_interrupt` and
+ /// consider it when producing the final result to avoid claiming success even though the operation is only partially
+ /// complete.
+ pub fn new(inner: I, should_interrupt: &'a AtomicBool) -> Self {
+ Iter {
+ inner,
+ should_interrupt,
+ }
+ }
+}
+
+impl<'a, I> Iterator for Iter<'a, I>
+where
+ I: Iterator,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.should_interrupt.load(Ordering::Relaxed) {
+ return None;
+ }
+ self.inner.next()
+ }
+}
+
+/// A wrapper for an inner iterator which will check for interruptions on each iteration.
+pub struct IterWithErr<'a, I, EFN> {
+ /// The actual iterator to yield elements from.
+ pub inner: I,
+ make_err: Option<EFN>,
+ should_interrupt: &'a AtomicBool,
+}
+
+impl<'a, I, EFN, E> IterWithErr<'a, I, EFN>
+where
+ I: Iterator,
+ EFN: FnOnce() -> E,
+{
+ /// Create a new iterator over `inner` which checks for interruptions on each iteration and calls `make_err()` to
+ /// signal an interruption happened, causing no further items to be iterated from that point on.
+ pub fn new(inner: I, make_err: EFN, should_interrupt: &'a AtomicBool) -> Self {
+ IterWithErr {
+ inner,
+ make_err: Some(make_err),
+ should_interrupt,
+ }
+ }
+}
+
+impl<'a, I, EFN, E> Iterator for IterWithErr<'a, I, EFN>
+where
+ I: Iterator,
+ EFN: FnOnce() -> E,
+{
+ type Item = Result<I::Item, E>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.make_err.as_ref()?;
+ if self.should_interrupt.load(Ordering::Relaxed) {
+ return self.make_err.take().map(|f| Err(f()));
+ }
+ match self.inner.next() {
+ Some(next) => Some(Ok(next)),
+ None => {
+ self.make_err = None;
+ None
+ }
+ }
+ }
+}
+
+/// A wrapper for implementors of [`std::io::Read`] or [`std::io::BufRead`] with interrupt support.
+///
+/// It fails a [read][`std::io::Read::read`] while an interrupt was requested.
+pub struct Read<'a, R> {
+ /// The actual implementor of [`std::io::Read`] to which interrupt support will be added.
+ pub inner: R,
+ /// The flag to trigger interruption
+ pub should_interrupt: &'a AtomicBool,
+}
+
+impl<'a, R> io::Read for Read<'a, R>
+where
+ R: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ if self.should_interrupt.load(Ordering::Relaxed) {
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, "Interrupted"));
+ }
+ self.inner.read(buf)
+ }
+}
+
+impl<'a, R> io::BufRead for Read<'a, R>
+where
+ R: io::BufRead,
+{
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ self.inner.fill_buf()
+ }
+
+ fn consume(&mut self, amt: usize) {
+ self.inner.consume(amt)
+ }
+}
diff --git a/vendor/gix-features/src/io.rs b/vendor/gix-features/src/io.rs
new file mode 100644
index 000000000..405960c0b
--- /dev/null
+++ b/vendor/gix-features/src/io.rs
@@ -0,0 +1,94 @@
+//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
+
+/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
+#[cfg(feature = "io-pipe")]
+pub mod pipe {
+ use std::io;
+
+ use bytes::{Buf, BufMut, BytesMut};
+
+ /// The write-end of the pipe, receiving items to become available in the [`Reader`].
+ ///
+ /// It's commonly used with the [`std::io::Write`] trait it implements.
+ pub struct Writer {
+ /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
+ pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
+ buf: BytesMut,
+ }
+
+ /// The read-end of the pipe, implementing the [`std::io::Read`] trait.
+ pub struct Reader {
+ channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
+ buf: BytesMut,
+ }
+
+ impl io::BufRead for Reader {
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ if self.buf.is_empty() {
+ match self.channel.recv() {
+ Ok(Ok(buf)) => self.buf = buf,
+ Ok(Err(err)) => return Err(err),
+ Err(_) => {}
+ }
+ };
+ Ok(&self.buf)
+ }
+
+ fn consume(&mut self, amt: usize) {
+ self.buf.advance(amt.min(self.buf.len()));
+ }
+ }
+
+ impl io::Read for Reader {
+ fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
+ let mut written = 0;
+ while !out.is_empty() {
+ if self.buf.is_empty() {
+ match self.channel.recv() {
+ Ok(Ok(buf)) => self.buf = buf,
+ Ok(Err(err)) => return Err(err),
+ Err(_) => break,
+ }
+ }
+ let bytes_to_write = self.buf.len().min(out.len());
+ let (to_write, rest) = out.split_at_mut(bytes_to_write);
+ self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
+ out = rest;
+ written += bytes_to_write;
+ }
+ Ok(written)
+ }
+ }
+
+ impl io::Write for Writer {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.buf.put_slice(buf);
+ self.channel
+ .send(Ok(self.buf.split()))
+ .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
+ Ok(buf.len())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+ }
+
+ /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
+ ///
+ /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
+ /// If `None` or `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
+ pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) {
+ let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0));
+ (
+ Writer {
+ channel: tx,
+ buf: BytesMut::with_capacity(4096),
+ },
+ Reader {
+ channel: rx,
+ buf: BytesMut::new(),
+ },
+ )
+ }
+}
diff --git a/vendor/gix-features/src/lib.rs b/vendor/gix-features/src/lib.rs
new file mode 100644
index 000000000..643320c0f
--- /dev/null
+++ b/vendor/gix-features/src/lib.rs
@@ -0,0 +1,64 @@
+//! A crate providing foundational capabilities to other `git-*` crates with trade-offs between compile time, binary size or speed
+//! selectable using cargo feature toggles.
+//!
+//! It's designed to allow the application level crate to configure feature toggles, affecting all other `git-*` crates using
+//! this one.
+//!
+//! Thus all features provided here commonly have a 'cheap' base implementation, with the option to pull in
+//! counterparts with higher performance.
+//! ## Feature Flags
+#![cfg_attr(
+ feature = "document-features",
+ cfg_attr(doc, doc = ::document_features::document_features!())
+)]
+#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
+#![deny(missing_docs, rust_2018_idioms, unsafe_code)]
+
+///
+pub mod cache;
+///
+pub mod decode;
+pub mod fs;
+pub mod hash;
+pub mod interrupt;
+#[cfg(feature = "io-pipe")]
+pub mod io;
+pub mod parallel;
+#[cfg(feature = "progress")]
+pub mod progress;
+pub mod threading;
+///
+#[cfg(feature = "zlib")]
+pub mod zlib;
+
+///
+pub mod iter {
+ /// An iterator over chunks of input, producing `Vec<Item>` with a size of `size`, with the last chunk being the remainder and thus
+ /// potentially smaller than `size`.
+ pub struct Chunks<I> {
+ /// The inner iterator to ask for items.
+ pub inner: I,
+ /// The size of chunks to produce
+ pub size: usize,
+ }
+
+ impl<I, Item> Iterator for Chunks<I>
+ where
+ I: Iterator<Item = Item>,
+ {
+ type Item = Vec<Item>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let mut res = Vec::with_capacity(self.size);
+ let mut items_left = self.size;
+ for item in &mut self.inner {
+ res.push(item);
+ items_left -= 1;
+ if items_left == 0 {
+ break;
+ }
+ }
+ (!res.is_empty()).then_some(res)
+ }
+ }
+}
diff --git a/vendor/gix-features/src/parallel/eager_iter.rs b/vendor/gix-features/src/parallel/eager_iter.rs
new file mode 100644
index 000000000..60123f54c
--- /dev/null
+++ b/vendor/gix-features/src/parallel/eager_iter.rs
@@ -0,0 +1,124 @@
+/// Evaluate any iterator in their own thread.
+///
+/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations.
+/// Use [`EagerIter::new()`] for instantiation.
+pub struct EagerIter<I: Iterator> {
+ receiver: std::sync::mpsc::Receiver<Vec<I::Item>>,
+ chunk: Option<std::vec::IntoIter<I::Item>>,
+ size_hint: (usize, Option<usize>),
+}
+
+impl<I> EagerIter<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ /// Return a new `EagerIter` which evaluates `iter` in its own thread,
+ /// with a given `chunk_size` allowing a maximum `chunks_in_flight`.
+ ///
+ /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`.
+ /// This helps to reduce the overhead imposed by transferring many small items.
+ /// If this number is 1, each item will become a single chunk. 0 is invalid.
+ /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items
+ /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter`
+ /// if its items aren't consumed fast enough.
+ pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
+ let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight);
+ let size_hint = iter.size_hint();
+ assert!(chunk_size > 0, "non-zero chunk size is needed");
+
+ std::thread::spawn(move || {
+ let mut out = Vec::with_capacity(chunk_size);
+ for item in iter {
+ out.push(item);
+ if out.len() == chunk_size {
+ if sender.send(out).is_err() {
+ return;
+ }
+ out = Vec::with_capacity(chunk_size);
+ }
+ }
+ if !out.is_empty() {
+ sender.send(out).ok();
+ }
+ });
+ EagerIter {
+ receiver,
+ chunk: None,
+ size_hint,
+ }
+ }
+
+ fn fill_buf_and_pop(&mut self) -> Option<I::Item> {
+ self.chunk = self.receiver.recv().ok().map(|v| {
+ assert!(!v.is_empty());
+ v.into_iter()
+ });
+ self.chunk.as_mut().and_then(|c| c.next())
+ }
+}
+
+impl<I> Iterator for EagerIter<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.chunk.as_mut() {
+ Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()),
+ None => self.fill_buf_and_pop(),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.size_hint
+ }
+}
+
+/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition.
+pub enum EagerIterIf<I: Iterator> {
+ /// A separate thread will eagerly evaluate iterator `I`.
+ Eager(EagerIter<I>),
+ /// The current thread evaluates `I`.
+ OnDemand(I),
+}
+
+impl<I> EagerIterIf<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ /// Return a new `EagerIterIf` if `condition()` returns true.
+ ///
+ /// For all other parameters, please see [`EagerIter::new()`].
+ pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self {
+ if condition() {
+ EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight))
+ } else {
+ EagerIterIf::OnDemand(iter)
+ }
+ }
+}
+impl<I> Iterator for EagerIterIf<I>
+where
+ I: Iterator + Send + 'static,
+ <I as Iterator>::Item: Send,
+{
+ type Item = I::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self {
+ EagerIterIf::OnDemand(i) => i.next(),
+ EagerIterIf::Eager(i) => i.next(),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self {
+ EagerIterIf::OnDemand(i) => i.size_hint(),
+ EagerIterIf::Eager(i) => i.size_hint(),
+ }
+ }
+}
diff --git a/vendor/gix-features/src/parallel/in_order.rs b/vendor/gix-features/src/parallel/in_order.rs
new file mode 100644
index 000000000..7928ac692
--- /dev/null
+++ b/vendor/gix-features/src/parallel/in_order.rs
@@ -0,0 +1,83 @@
+use std::{cmp::Ordering, collections::BTreeMap};
+
+/// A counter for items that are in sequence, to be able to put them back into original order later.
+pub type SequenceId = usize;
+
+/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order.
+pub struct InOrderIter<T, I> {
+ /// The iterator yielding the out-of-order elements we are to yield in order.
+ pub inner: I,
+ store: BTreeMap<SequenceId, T>,
+ next_chunk: SequenceId,
+ is_done: bool,
+}
+
+impl<T, E, I> From<I> for InOrderIter<T, I>
+where
+ I: Iterator<Item = Result<(SequenceId, T), E>>,
+{
+ fn from(iter: I) -> Self {
+ InOrderIter {
+ inner: iter,
+ store: Default::default(),
+ next_chunk: 0,
+ is_done: false,
+ }
+ }
+}
+
+impl<T, E, I> Iterator for InOrderIter<T, I>
+where
+ I: Iterator<Item = Result<(SequenceId, T), E>>,
+{
+ type Item = Result<T, E>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.is_done {
+ return None;
+ }
+ 'find_next_in_sequence: loop {
+ match self.inner.next() {
+ Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
+ Ordering::Equal => {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ Ordering::Less => {
+ unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
+ }
+ Ordering::Greater => {
+ let previous = self.store.insert(c, v);
+ assert!(
+ previous.is_none(),
+ "Chunks are returned only once, input is an invalid sequence"
+ );
+ if let Some(v) = self.store.remove(&self.next_chunk) {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ continue 'find_next_in_sequence;
+ }
+ },
+ Some(Err(e)) => {
+ self.is_done = true;
+ self.store.clear();
+ return Some(Err(e));
+ }
+ None => match self.store.remove(&self.next_chunk) {
+ Some(v) => {
+ self.next_chunk += 1;
+ return Some(Ok(v));
+ }
+ None => {
+ debug_assert!(
+ self.store.is_empty(),
+ "When iteration is done we should not have stored items left"
+ );
+ return None;
+ }
+ },
+ }
+ }
+ }
+}
diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs
new file mode 100644
index 000000000..e1e2cc3e3
--- /dev/null
+++ b/vendor/gix-features/src/parallel/in_parallel.rs
@@ -0,0 +1,211 @@
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+use crate::parallel::{num_threads, Reduce};
+
+/// Runs `left` and `right` in parallel, returning their output when both are done.
+pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
+ std::thread::scope(|s| {
+ let left = std::thread::Builder::new()
+ .name("gitoxide.join.left".into())
+ .spawn_scoped(s, left)
+ .expect("valid name");
+ let right = std::thread::Builder::new()
+ .name("gitoxide.join.right".into())
+ .spawn_scoped(s, right)
+ .expect("valid name");
+ (left.join().unwrap(), right.join().unwrap())
+ })
+}
+
+/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
+/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
+///
+/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not
+/// connect each other with channels as deadlock would occur in single-threaded mode.
+pub fn threads<'env, F, R>(f: F) -> R
+where
+ F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R,
+{
+ std::thread::scope(f)
+}
+
+/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
+pub fn build_thread() -> std::thread::Builder {
+ std::thread::Builder::new()
+}
+
+/// Read items from `input` and `consume` them in multiple threads,
+/// whose output output is collected by a `reducer`. Its task is to
+/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
+///
+/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
+/// created by `new_thread_state(…)`.
+/// * For `reducer`, see the [`Reduce`] trait
+pub fn in_parallel<I, S, O, R>(
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S + Send + Clone,
+ consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ std::thread::scope(move |s| {
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let consume = consume.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ break;
+ }
+ }
+ }
+ })
+ .expect("valid name");
+ }
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel.feed".into())
+ .spawn_scoped(s, move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ })
+ .expect("valid name");
+ receive_result
+ };
+
+ for item in receive_result {
+ drop(reducer.feed(item)?);
+ }
+ reducer.finalize()
+ })
+}
+
+/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
+/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
+/// for file-io as it won't make use of sorted inputs well.
+/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
+// TODO: better docs
+pub fn in_parallel_with_slice<I, S, R, E>(
+ input: &mut [I],
+ thread_limit: Option<usize>,
+ new_thread_state: impl FnMut(usize) -> S + Send + Clone,
+ consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone,
+ mut periodic: impl FnMut() -> Option<std::time::Duration> + Send,
+ state_to_rval: impl FnOnce(S) -> R + Send + Clone,
+) -> Result<Vec<R>, E>
+where
+ I: Send,
+ E: Send,
+ R: Send,
+{
+ let num_threads = num_threads(thread_limit);
+ let mut results = Vec::with_capacity(num_threads);
+ let stop_everything = &AtomicBool::default();
+ let index = &AtomicUsize::default();
+
+ // TODO: use std::thread::scope() once Rust 1.63 is available.
+ std::thread::scope({
+ move |s| {
+ std::thread::Builder::new()
+ .name("gitoxide.in_parallel_with_slice.watch-interrupts".into())
+ .spawn_scoped(s, {
+ move || loop {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+
+ match periodic() {
+ Some(duration) => std::thread::sleep(duration),
+ None => {
+ stop_everything.store(true, Ordering::Relaxed);
+ break;
+ }
+ }
+ }
+ })
+ .expect("valid name");
+
+ let input_len = input.len();
+ struct Input<I>(*mut [I])
+ where
+ I: Send;
+
+ // SAFETY: I is Send + Sync, so is a *mut [I]
+ #[allow(unsafe_code)]
+ unsafe impl<I> Send for Input<I> where I: Send {}
+
+ let threads: Vec<_> = (0..num_threads)
+ .map(|thread_id| {
+ std::thread::Builder::new()
+ .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}"))
+ .spawn_scoped(s, {
+ let mut new_thread_state = new_thread_state.clone();
+ let state_to_rval = state_to_rval.clone();
+ let mut consume = consume.clone();
+ let input = Input(input as *mut [I]);
+ move || {
+ let mut state = new_thread_state(thread_id);
+ while let Ok(input_index) =
+ index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
+ (x < input_len).then_some(x + 1)
+ })
+ {
+ if stop_everything.load(Ordering::Relaxed) {
+ break;
+ }
+ // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
+ // each item exactly once.
+ let item = {
+ #[allow(unsafe_code)]
+ unsafe {
+ &mut (&mut *input.0)[input_index]
+ }
+ };
+ if let Err(err) = consume(item, &mut state) {
+ stop_everything.store(true, Ordering::Relaxed);
+ return Err(err);
+ }
+ }
+ Ok(state_to_rval(state))
+ }
+ })
+ .expect("valid name")
+ })
+ .collect();
+ for thread in threads {
+ match thread.join() {
+ Ok(res) => {
+ results.push(res?);
+ }
+ Err(err) => {
+ // a panic happened, stop the world gracefully (even though we panic later)
+ stop_everything.store(true, Ordering::Relaxed);
+ std::panic::resume_unwind(err);
+ }
+ }
+ }
+
+ stop_everything.store(true, Ordering::Relaxed);
+ Ok(results)
+ }
+ })
+}
diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs
new file mode 100644
index 000000000..c994cb3b8
--- /dev/null
+++ b/vendor/gix-features/src/parallel/mod.rs
@@ -0,0 +1,179 @@
+//! Run computations in parallel, or not based the `parallel` feature toggle.
+//!
+//! ### in_parallel(…)
+//!
+//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
+//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
+//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
+//!
+//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail.
+//!
+//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
+//! or the data to work on.
+//!
+//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
+//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
+//! aggregation.
+//!
+//! ### `reduce::Stepwise`
+//!
+//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
+//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
+//! buffered, the owner of the iterator controls the progress made.
+//!
+//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
+//! is functionally equivalent to calling [`in_parallel()`].
+//!
+//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
+//! the iterator will wind down the computation without any result.
+//!
+//! #### Maintaining Safety
+//!
+//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
+//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
+//! suitable input iterators as long as they can hold something on the heap.
+#[cfg(feature = "parallel")]
+mod in_parallel;
+#[cfg(feature = "parallel")]
+pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+
+mod serial;
+#[cfg(not(feature = "parallel"))]
+pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
+
+mod in_order;
+pub use in_order::{InOrderIter, SequenceId};
+
+mod eager_iter;
+pub use eager_iter::{EagerIter, EagerIterIf};
+
+/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
+/// when the `parallel` feature toggle is not set.
+#[cfg(not(feature = "parallel"))]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ _num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ _available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ (desired_chunk_size, thread_limit, num_threads(thread_limit))
+}
+
+/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
+/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
+///
+/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
+/// * `num_items` is the total amount of items in the iteration, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `thread_limit` is the amount of threads to use at most, if `Some`.
+/// Otherwise this knowledge will not affect the output of this function.
+/// * `available_threads` is the total amount of threads available, if `Some`.
+/// Otherwise the actual amount of available threads is determined by querying the system.
+///
+/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
+#[cfg(feature = "parallel")]
+pub fn optimize_chunk_size_and_thread_limit(
+ desired_chunk_size: usize,
+ num_items: Option<usize>,
+ thread_limit: Option<usize>,
+ available_threads: Option<usize>,
+) -> (usize, Option<usize>, usize) {
+ let available_threads =
+ available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1));
+ let available_threads = thread_limit
+ .map(|l| if l == 0 { available_threads } else { l })
+ .unwrap_or(available_threads);
+
+ let (lower, upper) = (50, 1000);
+ let (chunk_size, thread_limit) = num_items
+ .map(|num_items| {
+ let desired_chunks_per_thread_at_least = 2;
+ let items = num_items;
+ let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
+ let num_chunks = items / chunk_size;
+ let thread_limit = if num_chunks <= available_threads {
+ (num_chunks / desired_chunks_per_thread_at_least).max(1)
+ } else {
+ available_threads
+ };
+ (chunk_size, thread_limit)
+ })
+ .unwrap_or({
+ let chunk_size = if available_threads == 1 {
+ desired_chunk_size
+ } else if desired_chunk_size < lower {
+ lower
+ } else {
+ desired_chunk_size.min(upper)
+ };
+ (chunk_size, available_threads)
+ });
+ (chunk_size, Some(thread_limit), thread_limit)
+}
+
+/// Always returns 1, available when the `parallel` feature toggle is unset.
+#[cfg(not(feature = "parallel"))]
+pub fn num_threads(_thread_limit: Option<usize>) -> usize {
+ 1
+}
+
+/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
+///
+/// Only available with the `parallel` feature toggle set.
+#[cfg(feature = "parallel")]
+pub fn num_threads(thread_limit: Option<usize>) -> usize {
+ let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
+ thread_limit
+ .map(|l| if l == 0 { logical_cores } else { l })
+ .unwrap_or(logical_cores)
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+#[cfg(feature = "parallel")]
+pub fn in_parallel_if<I, S, O, R>(
+ condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I> + Send,
+ thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S + Send + Clone,
+ consume: impl Fn(I, &mut S) -> O + Send + Clone,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ if num_threads(thread_limit) > 1 && condition() {
+ in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ } else {
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+ }
+}
+
+/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
+///
+/// For parameters, see the documentation of [`in_parallel()`]
+///
+/// Note that the non-parallel version is equivalent to [`in_parallel()`].
+#[cfg(not(feature = "parallel"))]
+pub fn in_parallel_if<I, S, O, R>(
+ _condition: impl FnOnce() -> bool,
+ input: impl Iterator<Item = I>,
+ thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S,
+ consume: impl Fn(I, &mut S) -> O,
+ reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+ I: Send,
+ O: Send,
+{
+ serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
+}
+
+///
+pub mod reduce;
+pub use reduce::Reduce;
diff --git a/vendor/gix-features/src/parallel/reduce.rs b/vendor/gix-features/src/parallel/reduce.rs
new file mode 100644
index 000000000..f9992cfd2
--- /dev/null
+++ b/vendor/gix-features/src/parallel/reduce.rs
@@ -0,0 +1,279 @@
+#[cfg(feature = "parallel")]
+mod stepped {
+ use crate::parallel::num_threads;
+
+ /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
+ /// for details.
+ pub struct Stepwise<Reduce: super::Reduce> {
+ /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops
+ /// as sending results fails when the receiver is dropped.
+ receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
+ /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
+ /// that happens, they break out of their loops.
+ threads: Vec<std::thread::JoinHandle<()>>,
+ /// The reducer is called only in the thread using the iterator, dropping it has no side effects.
+ reducer: Option<Reduce>,
+ }
+
+ impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
+ fn drop(&mut self) {
+ let (_, sink) = std::sync::mpsc::channel();
+ drop(std::mem::replace(&mut self.receive_result, sink));
+
+ let mut last_err = None;
+ for handle in std::mem::take(&mut self.threads) {
+ if let Err(err) = handle.join() {
+ last_err = Some(err);
+ };
+ }
+ if let Some(thread_err) = last_err {
+ std::panic::resume_unwind(thread_err);
+ }
+ }
+ }
+
+ impl<Reduce: super::Reduce> Stepwise<Reduce> {
+ /// Instantiate a new iterator and start working in threads.
+ /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
+ pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
+ input: InputIter,
+ thread_limit: Option<usize>,
+ new_thread_state: ThreadStateFn,
+ consume: ConsumeFn,
+ reducer: Reduce,
+ ) -> Self
+ where
+ InputIter: Iterator<Item = I> + Send + 'static,
+ ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
+ ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
+ Reduce: super::Reduce<Input = O> + 'static,
+ I: Send + 'static,
+ O: Send + 'static,
+ {
+ let num_threads = num_threads(thread_limit);
+ let mut threads = Vec::with_capacity(num_threads + 1);
+ let receive_result = {
+ let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
+ let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
+ for thread_id in 0..num_threads {
+ let handle = std::thread::spawn({
+ let send_result = send_result.clone();
+ let receive_input = receive_input.clone();
+ let new_thread_state = new_thread_state.clone();
+ let consume = consume.clone();
+ move || {
+ let mut state = new_thread_state(thread_id);
+ for item in receive_input {
+ if send_result.send(consume(item, &mut state)).is_err() {
+ break;
+ }
+ }
+ }
+ });
+ threads.push(handle);
+ }
+ threads.push(std::thread::spawn(move || {
+ for item in input {
+ if send_input.send(item).is_err() {
+ break;
+ }
+ }
+ }));
+ receive_result
+ };
+ Stepwise {
+ threads,
+ receive_result,
+ reducer: Some(reducer),
+ }
+ }
+
+ /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
+ pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
+ for value in self.by_ref() {
+ drop(value?);
+ }
+ self.reducer
+ .take()
+ .expect("this is the last call before consumption")
+ .finalize()
+ }
+ }
+
+ impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
+ type Item = Result<Reduce::FeedProduce, Reduce::Error>;
+
+ fn next(&mut self) -> Option<<Self as Iterator>::Item> {
+ self.receive_result
+ .recv()
+ .ok()
+ .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
+ }
+ }
+
+ impl<R: super::Reduce> super::Finalize for Stepwise<R> {
+ type Reduce = R;
+
+ fn finalize(
+ self,
+ ) -> Result<
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
+ > {
+ Stepwise::finalize(self)
+ }
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+mod stepped {
+ /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
+ /// for details.
+ pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
+ input: InputIter,
+ consume: ConsumeFn,
+ thread_state: ThreadState,
+ reducer: Reduce,
+ }
+
+ impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut S) -> O,
+ Reduce: super::Reduce<Input = O>,
+ {
+ /// Instantiate a new iterator.
+ /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
+ pub fn new<ThreadStateFn>(
+ input: InputIter,
+ _thread_limit: Option<usize>,
+ new_thread_state: ThreadStateFn,
+ consume: ConsumeFn,
+ reducer: Reduce,
+ ) -> Self
+ where
+ ThreadStateFn: Fn(usize) -> S,
+ {
+ Stepwise {
+ input,
+ consume,
+ thread_state: new_thread_state(0),
+ reducer,
+ }
+ }
+
+ /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
+ pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
+ for value in self.by_ref() {
+ drop(value?);
+ }
+ self.reducer.finalize()
+ }
+ }
+
+ impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut ThreadState) -> O,
+ Reduce: super::Reduce<Input = O>,
+ {
+ type Item = Result<Reduce::FeedProduce, Reduce::Error>;
+
+ fn next(&mut self) -> Option<<Self as Iterator>::Item> {
+ self.input
+ .next()
+ .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
+ }
+ }
+
+ impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
+ where
+ InputIter: Iterator<Item = I>,
+ ConsumeFn: Fn(I, &mut S) -> O,
+ R: super::Reduce<Input = O>,
+ {
+ type Reduce = R;
+
+ fn finalize(
+ self,
+ ) -> Result<
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
+ <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
+ > {
+ Stepwise::finalize(self)
+ }
+ }
+}
+
+use std::marker::PhantomData;
+
+pub use stepped::Stepwise;
+
+/// An trait for aggregating items commonly produced in threads into a single result, without itself
+/// needing to be thread safe.
+pub trait Reduce {
+ /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method.
+ ///
+ /// It's produced by a function that may run on multiple threads.
+ type Input;
+ /// The type produced in Ok(…) by [`feed()`][Reduce::feed()].
+ /// Most reducers by nature use `()` here as the value is in the aggregation.
+ /// However, some may use it to collect statistics only and return their Input
+ /// in some form as a result here for [`Stepwise`] to be useful.
+ type FeedProduce;
+ /// The type produced once by the [`finalize()`][Reduce::finalize()] method.
+ ///
+ /// For traditional reducers, this is the value produced by the entire operation.
+ /// For those made for step-wise iteration this may be aggregated statistics.
+ type Output;
+ /// The error type to use for all methods of this trait.
+ type Error;
+ /// Called each time a new `item` was produced in order to aggregate it into the final result.
+ ///
+ /// If an `Error` is returned, the entire operation will be stopped.
+ fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
+ /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`.
+ fn finalize(self) -> Result<Self::Output, Self::Error>;
+}
+
+/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()]
+/// without the use of non-threaded reduction of products created in threads.
+pub struct IdentityWithResult<Input, Error> {
+ _input: PhantomData<Input>,
+ _error: PhantomData<Error>,
+}
+
+impl<Input, Error> Default for IdentityWithResult<Input, Error> {
+ fn default() -> Self {
+ IdentityWithResult {
+ _input: Default::default(),
+ _error: Default::default(),
+ }
+ }
+}
+
+impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
+ type Input = Result<Input, Self::Error>;
+ type FeedProduce = Input;
+ type Output = ();
+ type Error = Error;
+
+ fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
+ item
+ }
+
+ fn finalize(self) -> Result<Self::Output, Self::Error> {
+ Ok(())
+ }
+}
+
+/// A trait reflecting the `finalize()` method of [`Reduce`] implementations
+pub trait Finalize {
+ /// An implementation of [`Reduce`]
+ type Reduce: self::Reduce;
+
+ /// Similar to the [`Reduce::finalize()`] method
+ fn finalize(
+ self,
+ ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
+}
diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs
new file mode 100644
index 000000000..00723b2c3
--- /dev/null
+++ b/vendor/gix-features/src/parallel/serial.rs
@@ -0,0 +1,135 @@
+use crate::parallel::Reduce;
+
+#[cfg(not(feature = "parallel"))]
+mod not_parallel {
+ /// Runs `left` and then `right`, one after another, returning their output when both are done.
+ pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
+ (left(), right())
+ }
+
+ /// A scope for spawning threads.
+ pub struct Scope<'env> {
+ _marker: std::marker::PhantomData<&'env mut &'env ()>,
+ }
+
+ pub struct ThreadBuilder;
+
+ /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
+ pub fn build_thread() -> ThreadBuilder {
+ ThreadBuilder
+ }
+
+ #[allow(unsafe_code)]
+ unsafe impl Sync for Scope<'_> {}
+
+ impl ThreadBuilder {
+ pub fn name(self, _new: String) -> Self {
+ self
+ }
+ pub fn spawn_scoped<'a, 'env, F, T>(
+ &self,
+ scope: &'a Scope<'env>,
+ f: F,
+ ) -> std::io::Result<ScopedJoinHandle<'a, T>>
+ where
+ F: FnOnce() -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ Ok(scope.spawn(f))
+ }
+ }
+
+ impl<'env> Scope<'env> {
+ pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ where
+ F: FnOnce() -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ ScopedJoinHandle {
+ result: f(),
+ _marker: Default::default(),
+ }
+ }
+ }
+
+ /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
+ /// Note that this implementation will run the spawned functions immediately.
+ pub fn threads<'env, F, R>(f: F) -> R
+ where
+ F: FnOnce(&Scope<'env>) -> R,
+ {
+ f(&Scope {
+ _marker: Default::default(),
+ })
+ }
+
+ /// A handle that can be used to join its scoped thread.
+ ///
+ /// This struct is created by the [`Scope::spawn`] method and the
+ /// [`ScopedThreadBuilder::spawn`] method.
+ pub struct ScopedJoinHandle<'scope, T> {
+ /// Holds the result of the inner closure.
+ result: T,
+ _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
+ }
+
+ impl<T> ScopedJoinHandle<'_, T> {
+ pub fn join(self) -> std::thread::Result<T> {
+ Ok(self.result)
+ }
+ }
+
+ /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
+ /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
+ /// for file-io as it won't make use of sorted inputs well.
+ // TODO: better docs
+ pub fn in_parallel_with_slice<I, S, R, E>(
+ input: &mut [I],
+ _thread_limit: Option<usize>,
+ mut new_thread_state: impl FnMut(usize) -> S + Clone,
+ mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
+ mut periodic: impl FnMut() -> Option<std::time::Duration>,
+ state_to_rval: impl FnOnce(S) -> R + Clone,
+ ) -> Result<Vec<R>, E> {
+ let mut state = new_thread_state(0);
+ for item in input {
+ consume(item, &mut state)?;
+ if periodic().is_none() {
+ break;
+ }
+ }
+ Ok(vec![state_to_rval(state)])
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
+
+/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
+/// whose task is to aggregate these outputs into the final result returned by this function.
+///
+/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
+/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
+/// * For `reducer`, see the [`Reduce`] trait
+/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
+/// similar to the parallel version.
+///
+/// **This serial version performing all calculations on the current thread.**
+pub fn in_parallel<I, S, O, R>(
+ input: impl Iterator<Item = I>,
+ _thread_limit: Option<usize>,
+ new_thread_state: impl Fn(usize) -> S,
+ consume: impl Fn(I, &mut S) -> O,
+ mut reducer: R,
+) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
+where
+ R: Reduce<Input = O>,
+{
+ let mut state = new_thread_state(0);
+ for item in input {
+ drop(reducer.feed(consume(item, &mut state))?);
+ }
+ reducer.finalize()
+}
diff --git a/vendor/gix-features/src/progress.rs b/vendor/gix-features/src/progress.rs
new file mode 100644
index 000000000..b6e56895b
--- /dev/null
+++ b/vendor/gix-features/src/progress.rs
@@ -0,0 +1,139 @@
+//! Various `prodash` types along with various utilities for comfort.
+use std::io;
+
+pub use prodash::{
+ self,
+ messages::MessageLevel,
+ progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN},
+ unit, Progress, Unit,
+};
+
+#[cfg(feature = "progress-unit-bytes")]
+pub use bytesize;
+/// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`.
+#[cfg(not(feature = "progress-unit-bytes"))]
+pub mod bytesize {
+ /// A stub for the `ByteSize` wrapper.
+ pub struct ByteSize(pub u64);
+
+ impl std::fmt::Display for ByteSize {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.0.fmt(f)
+ }
+ }
+}
+
+/// A unit for displaying bytes with throughput and progress percentage.
+#[cfg(feature = "progress-unit-bytes")]
+pub fn bytes() -> Option<Unit> {
+ Some(unit::dynamic_and_mode(
+ unit::Bytes,
+ unit::display::Mode::with_throughput().and_percentage(),
+ ))
+}
+
+/// A unit for displaying bytes with throughput and progress percentage.
+#[cfg(not(feature = "progress-unit-bytes"))]
+pub fn bytes() -> Option<Unit> {
+ Some(unit::label_and_mode(
+ "B",
+ unit::display::Mode::with_throughput().and_percentage(),
+ ))
+}
+
+/// A unit for displaying human readable numbers with throughput and progress percentage, and a single decimal place.
+pub fn count(name: &'static str) -> Option<Unit> {
+ count_with_decimals(name, 1)
+}
+
+/// A unit for displaying human readable numbers with `name` suffix,
+/// with throughput and progress percentage, and `decimals` decimal places.
+#[cfg(feature = "progress-unit-human-numbers")]
+pub fn count_with_decimals(name: &'static str, decimals: usize) -> Option<Unit> {
+ Some(unit::dynamic_and_mode(
+ unit::Human::new(
+ {
+ let mut f = unit::human::Formatter::new();
+ f.with_decimals(decimals);
+ f
+ },
+ name,
+ ),
+ unit::display::Mode::with_throughput().and_percentage(),
+ ))
+}
+
+/// A unit for displaying human readable numbers with `name` suffix,
+/// with throughput and progress percentage, and `decimals` decimal places.
+#[cfg(not(feature = "progress-unit-human-numbers"))]
+pub fn count_with_decimals(name: &'static str, _decimals: usize) -> Option<Unit> {
+ Some(unit::label_and_mode(
+ name,
+ unit::display::Mode::with_throughput().and_percentage(),
+ ))
+}
+
+/// A predefined unit for displaying a multi-step progress
+pub fn steps() -> Option<Unit> {
+ Some(unit::dynamic(unit::Range::new("steps")))
+}
+
+/// A structure passing every [`read`][std::io::Read::read()] call through to the contained Progress instance using [`inc_by(bytes_read)`][Progress::inc_by()].
+pub struct Read<T, P> {
+ /// The implementor of [`std::io::Read`] to which progress is added
+ pub inner: T,
+ /// The progress instance receiving progress information on each invocation of `reader`
+ pub progress: P,
+}
+
+impl<T, P> io::Read for Read<T, P>
+where
+ T: io::Read,
+ P: Progress,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let bytes_read = self.inner.read(buf)?;
+ self.progress.inc_by(bytes_read);
+ Ok(bytes_read)
+ }
+}
+
+impl<T, P> io::BufRead for Read<T, P>
+where
+ T: io::BufRead,
+ P: Progress,
+{
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ self.inner.fill_buf()
+ }
+
+ fn consume(&mut self, amt: usize) {
+ self.inner.consume(amt)
+ }
+}
+
+/// A structure passing every [`write`][std::io::Write::write()] call through to the contained Progress instance using [`inc_by(bytes_written)`][Progress::inc_by()].
+///
+/// This is particularly useful if the final size of the bytes to write is known or can be estimated precisely enough.
+pub struct Write<T, P> {
+ /// The implementor of [`std::io::Write`] to which progress is added
+ pub inner: T,
+ /// The progress instance receiving progress information on each invocation of `reader`
+ pub progress: P,
+}
+
+impl<T, P> io::Write for Write<T, P>
+where
+ T: io::Write,
+ P: Progress,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ let written = self.inner.write(buf)?;
+ self.progress.inc_by(written);
+ Ok(written)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.inner.flush()
+ }
+}
diff --git a/vendor/gix-features/src/threading.rs b/vendor/gix-features/src/threading.rs
new file mode 100644
index 000000000..ff0c819a5
--- /dev/null
+++ b/vendor/gix-features/src/threading.rs
@@ -0,0 +1,100 @@
+//! Type definitions for putting shared ownership and synchronized mutation behind the `threading` feature toggle.
+//!
+//! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature.
+
+#[cfg(feature = "parallel")]
+mod _impl {
+ use std::sync::Arc;
+
+ /// A thread-safe cell which can be written to only once.
+ #[cfg(feature = "once_cell")]
+ pub type OnceCell<T> = once_cell::sync::OnceCell<T>;
+ /// A reference counted pointer type for shared ownership.
+ pub type OwnShared<T> = Arc<T>;
+ /// A synchronization primitive which can start read-only and transition to support mutation.
+ pub type MutableOnDemand<T> = parking_lot::RwLock<T>;
+ /// A synchronization primitive which provides read-write access right away.
+ pub type Mutable<T> = parking_lot::Mutex<T>;
+ /// A guarded reference suitable for safekeeping in a struct.
+ pub type RefGuard<'a, T> = parking_lot::RwLockReadGuard<'a, T>;
+ /// A mapped reference created from a RefGuard
+ pub type MappedRefGuard<'a, U> = parking_lot::MappedRwLockReadGuard<'a, U>;
+
+ /// Get a shared reference through a [`MutableOnDemand`] for read-only access.
+ pub fn get_ref<T>(v: &MutableOnDemand<T>) -> RefGuard<'_, T> {
+ v.read()
+ }
+
+ /// Get a mutable reference through a [`MutableOnDemand`] for read-write access.
+ pub fn get_mut<T>(v: &MutableOnDemand<T>) -> parking_lot::RwLockWriteGuard<'_, T> {
+ v.write()
+ }
+
+ /// Get a mutable reference through a [`Mutable`] for read-write access.
+ pub fn lock<T>(v: &Mutable<T>) -> parking_lot::MutexGuard<'_, T> {
+ v.lock()
+ }
+
+ /// Downgrade a handle previously obtained with [`get_mut()`] to drop mutation support.
+ pub fn downgrade_mut_to_ref<'a, T>(
+ v: parking_lot::RwLockWriteGuard<'a, T>,
+ _orig: &'a MutableOnDemand<T>,
+ ) -> RefGuard<'a, T> {
+ parking_lot::RwLockWriteGuard::downgrade(v)
+ }
+
+ /// Map a read guard into a sub-type it contains.
+ pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> {
+ parking_lot::RwLockReadGuard::map(v, f)
+ }
+}
+
+#[cfg(not(feature = "parallel"))]
+mod _impl {
+ use std::{
+ cell::{Ref, RefCell, RefMut},
+ rc::Rc,
+ };
+
+ /// A thread-safe cell which can be written to only once.
+ #[cfg(feature = "once_cell")]
+ pub type OnceCell<T> = once_cell::unsync::OnceCell<T>;
+ /// A reference counted pointer type for shared ownership.
+ pub type OwnShared<T> = Rc<T>;
+ /// A synchronization primitive which can start read-only and transition to support mutation.
+ pub type MutableOnDemand<T> = RefCell<T>;
+ /// A synchronization primitive which provides read-write access right away.
+ pub type Mutable<T> = RefCell<T>;
+ /// A guarded reference suitable for safekeeping in a struct.
+ pub type RefGuard<'a, T> = Ref<'a, T>;
+ /// A mapped reference created from a RefGuard
+ pub type MappedRefGuard<'a, U> = Ref<'a, U>;
+
+ /// Get a shared reference through a [`MutableOnDemand`] for read-only access.
+ pub fn get_mut<T>(v: &RefCell<T>) -> RefMut<'_, T> {
+ v.borrow_mut()
+ }
+
+ /// Get a mutable reference through a [`Mutable`] for read-write access.
+ pub fn lock<T>(v: &Mutable<T>) -> RefMut<'_, T> {
+ v.borrow_mut()
+ }
+
+ /// Get a mutable reference through a [`MutableOnDemand`] for read-write access.
+ pub fn get_ref<T>(v: &RefCell<T>) -> RefGuard<'_, T> {
+ v.borrow()
+ }
+
+ /// Downgrade a handle previously obtained with [`upgrade_ref_to_mut()`] to drop mutation support.
+ pub fn downgrade_mut_to_ref<'a, T>(v: RefMut<'a, T>, orig: &'a RefCell<T>) -> RefGuard<'a, T> {
+ drop(v);
+ orig.borrow()
+ }
+
+ /// Map a read guard into a sub-type it contains.
+ pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> {
+ Ref::map(v, f)
+ }
+}
+
+pub use _impl::*;
diff --git a/vendor/gix-features/src/zlib/mod.rs b/vendor/gix-features/src/zlib/mod.rs
new file mode 100644
index 000000000..8dcdfd93f
--- /dev/null
+++ b/vendor/gix-features/src/zlib/mod.rs
@@ -0,0 +1,47 @@
+pub use flate2::{Decompress, Status};
+
+/// non-streaming interfaces for decompression
+pub mod inflate {
+ /// The error returned by various [Inflate methods][super::Inflate]
+ #[derive(Debug, thiserror::Error)]
+ #[allow(missing_docs)]
+ pub enum Error {
+ #[error("Could not write all bytes when decompressing content")]
+ WriteInflated(#[from] std::io::Error),
+ #[error("Could not decode zip stream, status was '{0:?}'")]
+ Inflate(#[from] flate2::DecompressError),
+ #[error("The zlib status indicated an error, status was '{0:?}'")]
+ Status(flate2::Status),
+ }
+}
+
+/// Decompress a few bytes of a zlib stream without allocation
+pub struct Inflate {
+ /// The actual decompressor doing all the work.
+ pub state: Decompress,
+}
+
+impl Default for Inflate {
+ fn default() -> Self {
+ Inflate {
+ state: Decompress::new(true),
+ }
+ }
+}
+
+impl Inflate {
+ /// Run the decompressor exactly once. Cannot be run multiple times
+ pub fn once(&mut self, input: &[u8], out: &mut [u8]) -> Result<(flate2::Status, usize, usize), inflate::Error> {
+ let before_in = self.state.total_in();
+ let before_out = self.state.total_out();
+ let status = self.state.decompress(input, out, flate2::FlushDecompress::None)?;
+ Ok((
+ status,
+ (self.state.total_in() - before_in) as usize,
+ (self.state.total_out() - before_out) as usize,
+ ))
+ }
+}
+
+///
+pub mod stream;
diff --git a/vendor/gix-features/src/zlib/stream/deflate/mod.rs b/vendor/gix-features/src/zlib/stream/deflate/mod.rs
new file mode 100644
index 000000000..55f575ea4
--- /dev/null
+++ b/vendor/gix-features/src/zlib/stream/deflate/mod.rs
@@ -0,0 +1,96 @@
+use flate2::Compress;
+
+const BUF_SIZE: usize = 4096 * 8;
+
+/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation.
+///
+/// Be sure to call `flush()` when done to finalize the deflate stream.
+pub struct Write<W> {
+ compressor: Compress,
+ inner: W,
+ buf: [u8; BUF_SIZE],
+}
+
+mod impls {
+ use std::io;
+
+ use flate2::{Compress, Compression, FlushCompress, Status};
+
+ use crate::zlib::stream::deflate;
+
+ impl<W> deflate::Write<W>
+ where
+ W: io::Write,
+ {
+ /// Create a new instance writing compressed bytes to `inner`.
+ pub fn new(inner: W) -> deflate::Write<W> {
+ deflate::Write {
+ compressor: Compress::new(Compression::fast(), true),
+ inner,
+ buf: [0; deflate::BUF_SIZE],
+ }
+ }
+
+ /// Reset the compressor, starting a new compression stream.
+ ///
+ /// That way multiple streams can be written to the same inner writer.
+ pub fn reset(&mut self) {
+ self.compressor.reset();
+ }
+
+ /// Consume `self` and return the inner writer.
+ pub fn into_inner(self) -> W {
+ self.inner
+ }
+
+ fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> {
+ let total_in_when_start = self.compressor.total_in();
+ loop {
+ let last_total_in = self.compressor.total_in();
+ let last_total_out = self.compressor.total_out();
+
+ let status = self
+ .compressor
+ .compress(buf, &mut self.buf, flush)
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+
+ let written = self.compressor.total_out() - last_total_out;
+ if written > 0 {
+ self.inner.write_all(&self.buf[..written as usize])?;
+ }
+
+ match status {
+ Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize),
+ Status::Ok | Status::BufError => {
+ let consumed = self.compressor.total_in() - last_total_in;
+ buf = &buf[consumed as usize..];
+
+ // output buffer still makes progress
+ if self.compressor.total_out() > last_total_out {
+ continue;
+ }
+ // input still makes progress
+ if self.compressor.total_in() > last_total_in {
+ continue;
+ }
+ // input also makes no progress anymore, need more so leave with what we have
+ return Ok((self.compressor.total_in() - total_in_when_start) as usize);
+ }
+ }
+ }
+ }
+ }
+
+ impl<W: io::Write> io::Write for deflate::Write<W> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.write_inner(buf, FlushCompress::None)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.write_inner(&[], FlushCompress::Finish).map(|_| ())
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/vendor/gix-features/src/zlib/stream/deflate/tests.rs b/vendor/gix-features/src/zlib/stream/deflate/tests.rs
new file mode 100644
index 000000000..ba0dd2a2c
--- /dev/null
+++ b/vendor/gix-features/src/zlib/stream/deflate/tests.rs
@@ -0,0 +1,101 @@
+mod deflate_stream {
+ use std::{
+ io,
+ io::{Read, Write},
+ };
+
+ use bstr::ByteSlice;
+ use flate2::Decompress;
+
+ use crate::zlib::stream::deflate;
+
+ /// Provide streaming decompression using the `std::io::Read` trait.
+ /// If `std::io::BufReader` is used, an allocation for the input buffer will be performed.
+ struct InflateReader<R> {
+ inner: R,
+ decompressor: Decompress,
+ }
+
+ impl<R> InflateReader<R>
+ where
+ R: io::BufRead,
+ {
+ pub fn from_read(read: R) -> InflateReader<R> {
+ InflateReader {
+ decompressor: Decompress::new(true),
+ inner: read,
+ }
+ }
+ }
+
+ impl<R> io::Read for InflateReader<R>
+ where
+ R: io::BufRead,
+ {
+ fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
+ crate::zlib::stream::inflate::read(&mut self.inner, &mut self.decompressor, into)
+ }
+ }
+
+ #[test]
+ fn small_file_decompress() -> Result<(), Box<dyn std::error::Error>> {
+ fn fixture_path(path: &str) -> std::path::PathBuf {
+ std::path::PathBuf::from("tests/fixtures").join(path)
+ }
+ let r = InflateReader::from_read(io::BufReader::new(std::fs::File::open(fixture_path(
+ "objects/37/d4e6c5c48ba0d245164c4e10d5f41140cab980",
+ ))?));
+ let mut bytes = r.bytes();
+ let content = bytes.by_ref().take(16).collect::<Result<Vec<_>, _>>()?;
+ assert_eq!(content.as_slice().as_bstr(), b"blob 9\0hi there\n".as_bstr());
+ assert!(bytes.next().is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn all_at_once() -> Result<(), Box<dyn std::error::Error>> {
+ let mut w = deflate::Write::new(Vec::new());
+ assert_eq!(w.write(b"hello")?, 5);
+ w.flush()?;
+
+ let out = w.inner;
+ assert!(out.len() == 12 || out.len() == 13);
+
+ assert_deflate_buffer(out, b"hello")
+ }
+
+ fn assert_deflate_buffer(out: Vec<u8>, expected: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
+ let mut actual = Vec::new();
+ InflateReader::from_read(out.as_slice()).read_to_end(&mut actual)?;
+ assert_eq!(actual, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn big_file_small_writes() -> Result<(), Box<dyn std::error::Error>> {
+ let mut w = deflate::Write::new(Vec::new());
+ let bytes = include_bytes!(
+ "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
+ );
+ for chunk in bytes.chunks(2) {
+ assert_eq!(w.write(chunk)?, chunk.len());
+ }
+ w.flush()?;
+
+ assert_deflate_buffer(w.inner, bytes)
+ }
+
+ #[test]
+ fn big_file_a_few_big_writes() -> Result<(), Box<dyn std::error::Error>> {
+ let mut w = deflate::Write::new(Vec::new());
+ let bytes = include_bytes!(
+ "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack"
+ );
+ for chunk in bytes.chunks(4096 * 9) {
+ assert_eq!(w.write(chunk)?, chunk.len());
+ }
+ w.flush()?;
+
+ assert_deflate_buffer(w.inner, bytes)
+ }
+}
diff --git a/vendor/gix-features/src/zlib/stream/inflate.rs b/vendor/gix-features/src/zlib/stream/inflate.rs
new file mode 100644
index 000000000..007ecedc6
--- /dev/null
+++ b/vendor/gix-features/src/zlib/stream/inflate.rs
@@ -0,0 +1,57 @@
+use std::{io, io::BufRead};
+
+use flate2::{Decompress, FlushDecompress, Status};
+
+/// The boxed variant is faster for what we do (moving the decompressor in and out a lot)
+pub struct ReadBoxed<R> {
+ /// The reader from which bytes should be decompressed.
+ pub inner: R,
+ /// The decompressor doing all the work.
+ pub decompressor: Box<Decompress>,
+}
+
+impl<R> io::Read for ReadBoxed<R>
+where
+ R: BufRead,
+{
+ fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
+ read(&mut self.inner, &mut self.decompressor, into)
+ }
+}
+
+/// Read bytes from `rd` and decompress them using `state` into a pre-allocated fitting buffer `dst`, returning the amount of bytes written.
+pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -> io::Result<usize> {
+ let mut total_written = 0;
+ loop {
+ let (written, consumed, ret, eof);
+ {
+ let input = rd.fill_buf()?;
+ eof = input.is_empty();
+ let before_out = state.total_out();
+ let before_in = state.total_in();
+ let flush = if eof {
+ FlushDecompress::Finish
+ } else {
+ FlushDecompress::None
+ };
+ ret = state.decompress(input, dst, flush);
+ written = (state.total_out() - before_out) as usize;
+ total_written += written;
+ dst = &mut dst[written..];
+ consumed = (state.total_in() - before_in) as usize;
+ }
+ rd.consume(consumed);
+
+ match ret {
+ // The stream has officially ended, nothing more to do here.
+ Ok(Status::StreamEnd) => return Ok(total_written),
+ // Either input our output are depleted even though the stream is not depleted yet.
+ Ok(Status::Ok) | Ok(Status::BufError) if eof || dst.is_empty() => return Ok(total_written),
+ // Some progress was made in both the input and the output, it must continue to reach the end.
+ Ok(Status::Ok) | Ok(Status::BufError) if consumed != 0 || written != 0 => continue,
+ // A strange state, where zlib makes no progress but isn't done either. Call it out.
+ Ok(Status::Ok) | Ok(Status::BufError) => unreachable!("Definitely a bug somewhere"),
+ Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput, "corrupt deflate stream")),
+ }
+ }
+}
diff --git a/vendor/gix-features/src/zlib/stream/mod.rs b/vendor/gix-features/src/zlib/stream/mod.rs
new file mode 100644
index 000000000..7fb239d36
--- /dev/null
+++ b/vendor/gix-features/src/zlib/stream/mod.rs
@@ -0,0 +1,4 @@
+///
+pub mod deflate;
+///
+pub mod inflate;