diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:41 +0000 |
commit | 10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87 (patch) | |
tree | bdffd5d80c26cf4a7a518281a204be1ace85b4c1 /vendor/gix-features/src | |
parent | Releasing progress-linux version 1.70.0+dfsg1-9~progress7.99u1. (diff) | |
download | rustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.tar.xz rustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.zip |
Merging upstream version 1.70.0+dfsg2.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-features/src')
20 files changed, 2388 insertions, 0 deletions
diff --git a/vendor/gix-features/src/cache.rs b/vendor/gix-features/src/cache.rs new file mode 100644 index 000000000..f7a2cf005 --- /dev/null +++ b/vendor/gix-features/src/cache.rs @@ -0,0 +1,76 @@ +#[cfg(feature = "cache-efficiency-debug")] +mod impl_ { + /// A helper to collect useful information about cache efficiency. + pub struct Debug { + owner: String, + hits: usize, + puts: usize, + misses: usize, + } + + impl Debug { + /// Create a new instance + #[inline] + pub fn new(owner: impl Into<String>) -> Self { + Debug { + owner: owner.into(), + hits: 0, + puts: 0, + misses: 0, + } + } + /// Count cache insertions + #[inline] + pub fn put(&mut self) { + self.puts += 1; + } + /// Count hits + #[inline] + pub fn hit(&mut self) { + self.hits += 1; + } + /// Count misses + #[inline] + pub fn miss(&mut self) { + self.misses += 1; + } + } + + impl Drop for Debug { + fn drop(&mut self) { + let hits = self.hits; + let misses = self.misses; + let ratio = hits as f32 / misses as f32; + eprintln!( + "{}[{:0x}]: {} / {} (hits/misses) = {:.02}%, puts = {}", + self.owner, + self as *const _ as usize, + hits, + misses, + ratio * 100.0, + self.puts + ); + } + } +} +#[cfg(not(feature = "cache-efficiency-debug"))] +mod impl_ { + /// The disabled, zero size do-nothing equivalent + pub struct Debug; + + impl Debug { + /// Create a new instance + #[inline] + pub fn new(_owner: impl Into<String>) -> Self { + Debug + } + /// noop + pub fn put(&mut self) {} + /// noop + pub fn hit(&mut self) {} + /// noop + pub fn miss(&mut self) {} + } +} + +pub use impl_::Debug; diff --git a/vendor/gix-features/src/decode.rs b/vendor/gix-features/src/decode.rs new file mode 100644 index 000000000..0df38710d --- /dev/null +++ b/vendor/gix-features/src/decode.rs @@ -0,0 +1,38 @@ +use std::io::Read; + +/// Decode variable int numbers from a `Read` implementation. +/// +/// Note: currently overflow checks are only done in debug mode. +#[inline] +pub fn leb64_from_read(mut r: impl Read) -> Result<(u64, usize), std::io::Error> { + let mut b = [0u8; 1]; + let mut i = 0; + r.read_exact(&mut b)?; + i += 1; + let mut value = b[0] as u64 & 0x7f; + while b[0] & 0x80 != 0 { + r.read_exact(&mut b)?; + i += 1; + debug_assert!(i <= 10, "Would overflow value at 11th iteration"); + value += 1; + value = (value << 7) + (b[0] as u64 & 0x7f) + } + Ok((value, i)) +} + +/// Decode variable int numbers. +#[inline] +pub fn leb64(d: &[u8]) -> (u64, usize) { + let mut i = 0; + let mut c = d[i]; + i += 1; + let mut value = c as u64 & 0x7f; + while c & 0x80 != 0 { + c = d[i]; + i += 1; + debug_assert!(i <= 10, "Would overflow value at 11th iteration"); + value += 1; + value = (value << 7) + (c as u64 & 0x7f) + } + (value, i) +} diff --git a/vendor/gix-features/src/fs.rs b/vendor/gix-features/src/fs.rs new file mode 100644 index 000000000..f65779b92 --- /dev/null +++ b/vendor/gix-features/src/fs.rs @@ -0,0 +1,246 @@ +//! Filesystem utilities +//! +//! These are will be parallel if the `parallel` feature is enabled, at the expense of compiling additional dependencies +//! along with runtime costs for maintaining a global [`rayon`](https://docs.rs/rayon) thread pool. +//! +//! For information on how to use the [`WalkDir`] type, have a look at +//! * [`jwalk::WalkDir`](https://docs.rs/jwalk/0.5.1/jwalk/type.WalkDir.html) if `parallel` feature is enabled +//! * [walkdir::WalkDir](https://docs.rs/walkdir/2.3.1/walkdir/struct.WalkDir.html) otherwise + +#[cfg(any(feature = "walkdir", feature = "fs-walkdir-parallel"))] +mod shared { + /// The desired level of parallelism. + pub enum Parallelism { + /// Do not parallelize at all by making a serial traversal on the current thread. + Serial, + /// Create a new thread pool for each traversal with up to 16 threads or the amount of logical cores of the machine. + ThreadPoolPerTraversal { + /// The base name of the threads we create as part of the thread-pool. + thread_name: &'static str, + }, + } +} + +/// +#[cfg(feature = "fs-walkdir-parallel")] +pub mod walkdir { + use std::path::Path; + + pub use jwalk::{DirEntry as DirEntryGeneric, DirEntryIter as DirEntryIterGeneric, Error, WalkDir}; + + pub use super::shared::Parallelism; + + /// An alias for an uncustomized directory entry to match the one of the non-parallel version offered by `walkdir`. + pub type DirEntry = DirEntryGeneric<((), ())>; + + impl From<Parallelism> for jwalk::Parallelism { + fn from(v: Parallelism) -> Self { + match v { + Parallelism::Serial => jwalk::Parallelism::Serial, + Parallelism::ThreadPoolPerTraversal { thread_name } => std::thread::available_parallelism() + .map(|threads| { + let pool = jwalk::rayon::ThreadPoolBuilder::new() + .num_threads(threads.get().min(16)) + .stack_size(128 * 1024) + .thread_name(move |idx| format!("{thread_name} {idx}")) + .build() + .expect("we only set options that can't cause a build failure"); + jwalk::Parallelism::RayonExistingPool { + pool: pool.into(), + busy_timeout: None, + } + }) + .unwrap_or_else(|_| Parallelism::Serial.into()), + } + } + } + + /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`. + pub fn walkdir_new(root: impl AsRef<Path>, parallelism: Parallelism) -> WalkDir { + WalkDir::new(root).skip_hidden(false).parallelism(parallelism.into()) + } + + /// Instantiate a new directory iterator which will not skip hidden files and is sorted + pub fn walkdir_sorted_new(root: impl AsRef<Path>, parallelism: Parallelism) -> WalkDir { + WalkDir::new(root) + .skip_hidden(false) + .sort(true) + .parallelism(parallelism.into()) + } + + /// The Iterator yielding directory items + pub type DirEntryIter = DirEntryIterGeneric<((), ())>; +} + +#[cfg(all(feature = "walkdir", not(feature = "fs-walkdir-parallel")))] +/// +pub mod walkdir { + use std::path::Path; + + pub use walkdir::{DirEntry, Error, WalkDir}; + + pub use super::shared::Parallelism; + + /// Instantiate a new directory iterator which will not skip hidden files, with the given level of `parallelism`. + pub fn walkdir_new(root: impl AsRef<Path>, _: Parallelism) -> WalkDir { + WalkDir::new(root) + } + + /// Instantiate a new directory iterator which will not skip hidden files and is sorted, with the given level of `parallelism`. + pub fn walkdir_sorted_new(root: impl AsRef<Path>, _: Parallelism) -> WalkDir { + WalkDir::new(root).sort_by_file_name() + } + + /// The Iterator yielding directory items + pub type DirEntryIter = walkdir::IntoIter; +} + +#[cfg(any(feature = "walkdir", feature = "fs-walkdir-parallel"))] +pub use self::walkdir::{walkdir_new, walkdir_sorted_new, WalkDir}; + +/// Prepare open options which won't follow symlinks when the file is opened. +/// +/// Note: only effective on unix currently. +pub fn open_options_no_follow() -> std::fs::OpenOptions { + #[cfg_attr(not(unix), allow(unused_mut))] + let mut options = std::fs::OpenOptions::new(); + #[cfg(unix)] + { + /// Make sure that it's impossible to follow through to the target of symlinks. + /// Note that this will still follow symlinks in the path, which is what we assume + /// has been checked separately. + use std::os::unix::fs::OpenOptionsExt; + options.custom_flags(libc::O_NOFOLLOW); + } + options +} + +mod snapshot { + use std::ops::Deref; + + use crate::threading::{get_mut, get_ref, MutableOnDemand, OwnShared}; + + /// A structure holding enough information to reload a value if its on-disk representation changes as determined by its modified time. + #[derive(Debug)] + pub struct Snapshot<T: std::fmt::Debug> { + value: T, + modified: std::time::SystemTime, + } + + impl<T: Clone + std::fmt::Debug> Clone for Snapshot<T> { + fn clone(&self) -> Self { + Self { + value: self.value.clone(), + modified: self.modified, + } + } + } + + /// A snapshot of a resource which is up-to-date in the moment it is retrieved. + pub type SharedSnapshot<T> = OwnShared<Snapshot<T>>; + + /// Use this type for fields in structs that are to store the [`Snapshot`], typically behind an [`OwnShared`]. + /// + /// Note that the resource itself is behind another [`OwnShared`] to allow it to be used without holding any kind of lock, hence + /// without blocking updates while it is used. + #[derive(Debug, Default)] + pub struct MutableSnapshot<T: std::fmt::Debug>(pub MutableOnDemand<Option<SharedSnapshot<T>>>); + + impl<T: std::fmt::Debug> Deref for Snapshot<T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } + } + + impl<T: std::fmt::Debug> Deref for MutableSnapshot<T> { + type Target = MutableOnDemand<Option<SharedSnapshot<T>>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl<T: std::fmt::Debug> MutableSnapshot<T> { + /// Create a new instance of this type. + /// + /// Useful in case `Default::default()` isn't working for some reason. + pub fn new() -> Self { + MutableSnapshot(MutableOnDemand::new(None)) + } + + /// Refresh `state` forcefully by re-`open`ing the resource. Note that `open()` returns `None` if the resource isn't + /// present on disk, and that it's critical that the modified time is obtained _before_ opening the resource. + pub fn force_refresh<E>( + &self, + open: impl FnOnce() -> Result<Option<(std::time::SystemTime, T)>, E>, + ) -> Result<(), E> { + let mut state = get_mut(&self.0); + *state = open()?.map(|(modified, value)| OwnShared::new(Snapshot { value, modified })); + Ok(()) + } + + /// Assure that the resource in `state` is up-to-date by comparing the `current_modification_time` with the one we know in `state` + /// and by acting accordingly. + /// Returns the potentially updated/reloaded resource if it is still present on disk, which then represents a snapshot that is up-to-date + /// in that very moment, or `None` if the underlying file doesn't exist. + /// + /// Note that even though this is racy, each time a request is made there is a chance to see the actual state. + pub fn recent_snapshot<E>( + &self, + mut current_modification_time: impl FnMut() -> Option<std::time::SystemTime>, + open: impl FnOnce() -> Result<Option<T>, E>, + ) -> Result<Option<SharedSnapshot<T>>, E> { + let state = get_ref(self); + let recent_modification = current_modification_time(); + let buffer = match (&*state, recent_modification) { + (None, None) => (*state).clone(), + (Some(_), None) => { + drop(state); + let mut state = get_mut(self); + *state = None; + (*state).clone() + } + (Some(snapshot), Some(modified_time)) => { + if snapshot.modified < modified_time { + drop(state); + let mut state = get_mut(self); + + if let (Some(_snapshot), Some(modified_time)) = (&*state, current_modification_time()) { + *state = open()?.map(|value| { + OwnShared::new(Snapshot { + value, + modified: modified_time, + }) + }); + } + + (*state).clone() + } else { + // Note that this relies on sub-section precision or else is a race when the packed file was just changed. + // It's nothing we can know though, so… up to the caller unfortunately. + Some(snapshot.clone()) + } + } + (None, Some(_modified_time)) => { + drop(state); + let mut state = get_mut(self); + // Still in the same situation? If so, load the buffer. This compensates for the trampling herd + // during lazy-loading at the expense of another mtime check. + if let (None, Some(modified_time)) = (&*state, current_modification_time()) { + *state = open()?.map(|value| { + OwnShared::new(Snapshot { + value, + modified: modified_time, + }) + }); + } + (*state).clone() + } + }; + Ok(buffer) + } + } +} +pub use snapshot::{MutableSnapshot, SharedSnapshot, Snapshot}; diff --git a/vendor/gix-features/src/hash.rs b/vendor/gix-features/src/hash.rs new file mode 100644 index 000000000..fe064139a --- /dev/null +++ b/vendor/gix-features/src/hash.rs @@ -0,0 +1,190 @@ +//! Hash functions and hash utilities +//! +//! With the `fast-sha1` feature, the `Sha1` hash type will use a more elaborate implementation utilizing hardware support +//! in case it is available. Otherwise the `rustsha1` feature should be set. `fast-sha1` will take precedence. +//! Otherwise, a minimal yet performant implementation is used instead for a decent trade-off between compile times and run-time performance. +#[cfg(all(feature = "rustsha1", not(feature = "fast-sha1")))] +mod _impl { + use super::Sha1Digest; + + /// A implementation of the Sha1 hash, which can be used once. + #[derive(Default, Clone)] + pub struct Sha1(sha1_smol::Sha1); + + impl Sha1 { + /// Digest the given `bytes`. + pub fn update(&mut self, bytes: &[u8]) { + self.0.update(bytes) + } + /// Finalize the hash and produce a digest. + pub fn digest(self) -> Sha1Digest { + self.0.digest().bytes() + } + } +} + +/// A 20 bytes digest produced by a [`Sha1`] hash implementation. +#[cfg(any(feature = "fast-sha1", feature = "rustsha1"))] +pub type Sha1Digest = [u8; 20]; + +#[cfg(feature = "fast-sha1")] +mod _impl { + use sha1::Digest; + + use super::Sha1Digest; + + /// A implementation of the Sha1 hash, which can be used once. + #[derive(Default, Clone)] + pub struct Sha1(sha1::Sha1); + + impl Sha1 { + /// Digest the given `bytes`. + pub fn update(&mut self, bytes: &[u8]) { + self.0.update(bytes) + } + /// Finalize the hash and produce a digest. + pub fn digest(self) -> Sha1Digest { + self.0.finalize().into() + } + } +} + +#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))] +pub use _impl::Sha1; + +/// Compute a CRC32 hash from the given `bytes`, returning the CRC32 hash. +/// +/// When calling this function for the first time, `previous_value` should be `0`. Otherwise it +/// should be the previous return value of this function to provide a hash of multiple sequential +/// chunks of `bytes`. +#[cfg(feature = "crc32")] +pub fn crc32_update(previous_value: u32, bytes: &[u8]) -> u32 { + let mut h = crc32fast::Hasher::new_with_initial(previous_value); + h.update(bytes); + h.finalize() +} + +/// Compute a CRC32 value of the given input `bytes`. +/// +/// In case multiple chunks of `bytes` are present, one should use [`crc32_update()`] instead. +#[cfg(feature = "crc32")] +pub fn crc32(bytes: &[u8]) -> u32 { + let mut h = crc32fast::Hasher::new(); + h.update(bytes); + h.finalize() +} + +/// Produce a hasher suitable for the given kind of hash. +#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))] +pub fn hasher(kind: gix_hash::Kind) -> Sha1 { + match kind { + gix_hash::Kind::Sha1 => Sha1::default(), + } +} + +/// Compute the hash of `kind` for the bytes in the file at `path`, hashing only the first `num_bytes_from_start` +/// while initializing and calling `progress`. +/// +/// `num_bytes_from_start` is useful to avoid reading trailing hashes, which are never part of the hash itself, +/// denoting the amount of bytes to hash starting from the beginning of the file. +/// +/// # Note +/// +/// * Only available with the `gix-object` feature enabled due to usage of the [`gix_hash::Kind`] enum and the +/// [`gix_hash::ObjectId`] return value. +/// * [Interrupts][crate::interrupt] are supported. +#[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))] +pub fn bytes_of_file( + path: impl AsRef<std::path::Path>, + num_bytes_from_start: usize, + kind: gix_hash::Kind, + progress: &mut impl crate::progress::Progress, + should_interrupt: &std::sync::atomic::AtomicBool, +) -> std::io::Result<gix_hash::ObjectId> { + bytes( + std::fs::File::open(path)?, + num_bytes_from_start, + kind, + progress, + should_interrupt, + ) +} + +/// Similar to [`bytes_of_file`], but operates on an already open file. +#[cfg(all(feature = "progress", any(feature = "rustsha1", feature = "fast-sha1")))] +pub fn bytes( + mut read: impl std::io::Read, + num_bytes_from_start: usize, + kind: gix_hash::Kind, + progress: &mut impl crate::progress::Progress, + should_interrupt: &std::sync::atomic::AtomicBool, +) -> std::io::Result<gix_hash::ObjectId> { + let mut hasher = hasher(kind); + let start = std::time::Instant::now(); + // init progress before the possibility for failure, as convenience in case people want to recover + progress.init(Some(num_bytes_from_start), crate::progress::bytes()); + + const BUF_SIZE: usize = u16::MAX as usize; + let mut buf = [0u8; BUF_SIZE]; + let mut bytes_left = num_bytes_from_start; + + while bytes_left > 0 { + let out = &mut buf[..BUF_SIZE.min(bytes_left)]; + read.read_exact(out)?; + bytes_left -= out.len(); + progress.inc_by(out.len()); + hasher.update(out); + if should_interrupt.load(std::sync::atomic::Ordering::SeqCst) { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Interrupted")); + } + } + + let id = gix_hash::ObjectId::from(hasher.digest()); + progress.show_throughput(start); + Ok(id) +} + +#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))] +mod write { + use crate::hash::Sha1; + + /// A utility to automatically generate a hash while writing into an inner writer. + pub struct Write<T> { + /// The hash implementation. + pub hash: Sha1, + /// The inner writer. + pub inner: T, + } + + impl<T> std::io::Write for Write<T> + where + T: std::io::Write, + { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + let written = self.inner.write(buf)?; + self.hash.update(&buf[..written]); + Ok(written) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } + } + + impl<T> Write<T> + where + T: std::io::Write, + { + /// Create a new hash writer which hashes all bytes written to `inner` with a hash of `kind`. + pub fn new(inner: T, object_hash: gix_hash::Kind) -> Self { + match object_hash { + gix_hash::Kind::Sha1 => Write { + inner, + hash: Sha1::default(), + }, + } + } + } +} +#[cfg(any(feature = "rustsha1", feature = "fast-sha1"))] +pub use write::Write; diff --git a/vendor/gix-features/src/interrupt.rs b/vendor/gix-features/src/interrupt.rs new file mode 100644 index 000000000..1f78e613a --- /dev/null +++ b/vendor/gix-features/src/interrupt.rs @@ -0,0 +1,125 @@ +//! Utilities to cause interruptions in common traits, like Read/Write and Iterator. +use std::{ + io, + sync::atomic::{AtomicBool, Ordering}, +}; + +/// A wrapper for an inner iterator which will check for interruptions on each iteration, stopping the iteration when +/// that is requested. +pub struct Iter<'a, I> { + /// The actual iterator to yield elements from. + pub inner: I, + should_interrupt: &'a AtomicBool, +} + +impl<'a, I> Iter<'a, I> +where + I: Iterator, +{ + /// Create a new iterator over `inner` which checks for interruptions on each iteration on `should_interrupt`. + /// + /// Note that this means the consumer of the iterator data should also be able to access `should_interrupt` and + /// consider it when producing the final result to avoid claiming success even though the operation is only partially + /// complete. + pub fn new(inner: I, should_interrupt: &'a AtomicBool) -> Self { + Iter { + inner, + should_interrupt, + } + } +} + +impl<'a, I> Iterator for Iter<'a, I> +where + I: Iterator, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + if self.should_interrupt.load(Ordering::Relaxed) { + return None; + } + self.inner.next() + } +} + +/// A wrapper for an inner iterator which will check for interruptions on each iteration. +pub struct IterWithErr<'a, I, EFN> { + /// The actual iterator to yield elements from. + pub inner: I, + make_err: Option<EFN>, + should_interrupt: &'a AtomicBool, +} + +impl<'a, I, EFN, E> IterWithErr<'a, I, EFN> +where + I: Iterator, + EFN: FnOnce() -> E, +{ + /// Create a new iterator over `inner` which checks for interruptions on each iteration and calls `make_err()` to + /// signal an interruption happened, causing no further items to be iterated from that point on. + pub fn new(inner: I, make_err: EFN, should_interrupt: &'a AtomicBool) -> Self { + IterWithErr { + inner, + make_err: Some(make_err), + should_interrupt, + } + } +} + +impl<'a, I, EFN, E> Iterator for IterWithErr<'a, I, EFN> +where + I: Iterator, + EFN: FnOnce() -> E, +{ + type Item = Result<I::Item, E>; + + fn next(&mut self) -> Option<Self::Item> { + self.make_err.as_ref()?; + if self.should_interrupt.load(Ordering::Relaxed) { + return self.make_err.take().map(|f| Err(f())); + } + match self.inner.next() { + Some(next) => Some(Ok(next)), + None => { + self.make_err = None; + None + } + } + } +} + +/// A wrapper for implementors of [`std::io::Read`] or [`std::io::BufRead`] with interrupt support. +/// +/// It fails a [read][`std::io::Read::read`] while an interrupt was requested. +pub struct Read<'a, R> { + /// The actual implementor of [`std::io::Read`] to which interrupt support will be added. + pub inner: R, + /// The flag to trigger interruption + pub should_interrupt: &'a AtomicBool, +} + +impl<'a, R> io::Read for Read<'a, R> +where + R: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if self.should_interrupt.load(Ordering::Relaxed) { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Interrupted")); + } + self.inner.read(buf) + } +} + +impl<'a, R> io::BufRead for Read<'a, R> +where + R: io::BufRead, +{ + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.inner.fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.inner.consume(amt) + } +} diff --git a/vendor/gix-features/src/io.rs b/vendor/gix-features/src/io.rs new file mode 100644 index 000000000..405960c0b --- /dev/null +++ b/vendor/gix-features/src/io.rs @@ -0,0 +1,94 @@ +//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. + +/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. +#[cfg(feature = "io-pipe")] +pub mod pipe { + use std::io; + + use bytes::{Buf, BufMut, BytesMut}; + + /// The write-end of the pipe, receiving items to become available in the [`Reader`]. + /// + /// It's commonly used with the [`std::io::Write`] trait it implements. + pub struct Writer { + /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead. + pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>, + buf: BytesMut, + } + + /// The read-end of the pipe, implementing the [`std::io::Read`] trait. + pub struct Reader { + channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>, + buf: BytesMut, + } + + impl io::BufRead for Reader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.buf.is_empty() { + match self.channel.recv() { + Ok(Ok(buf)) => self.buf = buf, + Ok(Err(err)) => return Err(err), + Err(_) => {} + } + }; + Ok(&self.buf) + } + + fn consume(&mut self, amt: usize) { + self.buf.advance(amt.min(self.buf.len())); + } + } + + impl io::Read for Reader { + fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> { + let mut written = 0; + while !out.is_empty() { + if self.buf.is_empty() { + match self.channel.recv() { + Ok(Ok(buf)) => self.buf = buf, + Ok(Err(err)) => return Err(err), + Err(_) => break, + } + } + let bytes_to_write = self.buf.len().min(out.len()); + let (to_write, rest) = out.split_at_mut(bytes_to_write); + self.buf.split_to(bytes_to_write).copy_to_slice(to_write); + out = rest; + written += bytes_to_write; + } + Ok(written) + } + } + + impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.buf.put_slice(buf); + self.channel + .send(Ok(self.buf.split())) + .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe. + /// + /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing. + /// If `None` or `0`, the `write` end will always block until the `read` end consumes the transferred bytes. + pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) { + let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0)); + ( + Writer { + channel: tx, + buf: BytesMut::with_capacity(4096), + }, + Reader { + channel: rx, + buf: BytesMut::new(), + }, + ) + } +} diff --git a/vendor/gix-features/src/lib.rs b/vendor/gix-features/src/lib.rs new file mode 100644 index 000000000..643320c0f --- /dev/null +++ b/vendor/gix-features/src/lib.rs @@ -0,0 +1,64 @@ +//! A crate providing foundational capabilities to other `git-*` crates with trade-offs between compile time, binary size or speed +//! selectable using cargo feature toggles. +//! +//! It's designed to allow the application level crate to configure feature toggles, affecting all other `git-*` crates using +//! this one. +//! +//! Thus all features provided here commonly have a 'cheap' base implementation, with the option to pull in +//! counterparts with higher performance. +//! ## Feature Flags +#![cfg_attr( + feature = "document-features", + cfg_attr(doc, doc = ::document_features::document_features!()) +)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![deny(missing_docs, rust_2018_idioms, unsafe_code)] + +/// +pub mod cache; +/// +pub mod decode; +pub mod fs; +pub mod hash; +pub mod interrupt; +#[cfg(feature = "io-pipe")] +pub mod io; +pub mod parallel; +#[cfg(feature = "progress")] +pub mod progress; +pub mod threading; +/// +#[cfg(feature = "zlib")] +pub mod zlib; + +/// +pub mod iter { + /// An iterator over chunks of input, producing `Vec<Item>` with a size of `size`, with the last chunk being the remainder and thus + /// potentially smaller than `size`. + pub struct Chunks<I> { + /// The inner iterator to ask for items. + pub inner: I, + /// The size of chunks to produce + pub size: usize, + } + + impl<I, Item> Iterator for Chunks<I> + where + I: Iterator<Item = Item>, + { + type Item = Vec<Item>; + + fn next(&mut self) -> Option<Self::Item> { + let mut res = Vec::with_capacity(self.size); + let mut items_left = self.size; + for item in &mut self.inner { + res.push(item); + items_left -= 1; + if items_left == 0 { + break; + } + } + (!res.is_empty()).then_some(res) + } + } +} diff --git a/vendor/gix-features/src/parallel/eager_iter.rs b/vendor/gix-features/src/parallel/eager_iter.rs new file mode 100644 index 000000000..60123f54c --- /dev/null +++ b/vendor/gix-features/src/parallel/eager_iter.rs @@ -0,0 +1,124 @@ +/// Evaluate any iterator in their own thread. +/// +/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations. +/// Use [`EagerIter::new()`] for instantiation. +pub struct EagerIter<I: Iterator> { + receiver: std::sync::mpsc::Receiver<Vec<I::Item>>, + chunk: Option<std::vec::IntoIter<I::Item>>, + size_hint: (usize, Option<usize>), +} + +impl<I> EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIter` which evaluates `iter` in its own thread, + /// with a given `chunk_size` allowing a maximum `chunks_in_flight`. + /// + /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`. + /// This helps to reduce the overhead imposed by transferring many small items. + /// If this number is 1, each item will become a single chunk. 0 is invalid. + /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items + /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter` + /// if its items aren't consumed fast enough. + pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight); + let size_hint = iter.size_hint(); + assert!(chunk_size > 0, "non-zero chunk size is needed"); + + std::thread::spawn(move || { + let mut out = Vec::with_capacity(chunk_size); + for item in iter { + out.push(item); + if out.len() == chunk_size { + if sender.send(out).is_err() { + return; + } + out = Vec::with_capacity(chunk_size); + } + } + if !out.is_empty() { + sender.send(out).ok(); + } + }); + EagerIter { + receiver, + chunk: None, + size_hint, + } + } + + fn fill_buf_and_pop(&mut self) -> Option<I::Item> { + self.chunk = self.receiver.recv().ok().map(|v| { + assert!(!v.is_empty()); + v.into_iter() + }); + self.chunk.as_mut().and_then(|c| c.next()) + } +} + +impl<I> Iterator for EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self.chunk.as_mut() { + Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()), + None => self.fill_buf_and_pop(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.size_hint + } +} + +/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition. +pub enum EagerIterIf<I: Iterator> { + /// A separate thread will eagerly evaluate iterator `I`. + Eager(EagerIter<I>), + /// The current thread evaluates `I`. + OnDemand(I), +} + +impl<I> EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIterIf` if `condition()` returns true. + /// + /// For all other parameters, please see [`EagerIter::new()`]. + pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + if condition() { + EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight)) + } else { + EagerIterIf::OnDemand(iter) + } + } +} +impl<I> Iterator for EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self { + EagerIterIf::OnDemand(i) => i.next(), + EagerIterIf::Eager(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + EagerIterIf::OnDemand(i) => i.size_hint(), + EagerIterIf::Eager(i) => i.size_hint(), + } + } +} diff --git a/vendor/gix-features/src/parallel/in_order.rs b/vendor/gix-features/src/parallel/in_order.rs new file mode 100644 index 000000000..7928ac692 --- /dev/null +++ b/vendor/gix-features/src/parallel/in_order.rs @@ -0,0 +1,83 @@ +use std::{cmp::Ordering, collections::BTreeMap}; + +/// A counter for items that are in sequence, to be able to put them back into original order later. +pub type SequenceId = usize; + +/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order. +pub struct InOrderIter<T, I> { + /// The iterator yielding the out-of-order elements we are to yield in order. + pub inner: I, + store: BTreeMap<SequenceId, T>, + next_chunk: SequenceId, + is_done: bool, +} + +impl<T, E, I> From<I> for InOrderIter<T, I> +where + I: Iterator<Item = Result<(SequenceId, T), E>>, +{ + fn from(iter: I) -> Self { + InOrderIter { + inner: iter, + store: Default::default(), + next_chunk: 0, + is_done: false, + } + } +} + +impl<T, E, I> Iterator for InOrderIter<T, I> +where + I: Iterator<Item = Result<(SequenceId, T), E>>, +{ + type Item = Result<T, E>; + + fn next(&mut self) -> Option<Self::Item> { + if self.is_done { + return None; + } + 'find_next_in_sequence: loop { + match self.inner.next() { + Some(Ok((c, v))) => match c.cmp(&self.next_chunk) { + Ordering::Equal => { + self.next_chunk += 1; + return Some(Ok(v)); + } + Ordering::Less => { + unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c) + } + Ordering::Greater => { + let previous = self.store.insert(c, v); + assert!( + previous.is_none(), + "Chunks are returned only once, input is an invalid sequence" + ); + if let Some(v) = self.store.remove(&self.next_chunk) { + self.next_chunk += 1; + return Some(Ok(v)); + } + continue 'find_next_in_sequence; + } + }, + Some(Err(e)) => { + self.is_done = true; + self.store.clear(); + return Some(Err(e)); + } + None => match self.store.remove(&self.next_chunk) { + Some(v) => { + self.next_chunk += 1; + return Some(Ok(v)); + } + None => { + debug_assert!( + self.store.is_empty(), + "When iteration is done we should not have stored items left" + ); + return None; + } + }, + } + } + } +} diff --git a/vendor/gix-features/src/parallel/in_parallel.rs b/vendor/gix-features/src/parallel/in_parallel.rs new file mode 100644 index 000000000..e1e2cc3e3 --- /dev/null +++ b/vendor/gix-features/src/parallel/in_parallel.rs @@ -0,0 +1,211 @@ +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +use crate::parallel::{num_threads, Reduce}; + +/// Runs `left` and `right` in parallel, returning their output when both are done. +pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { + std::thread::scope(|s| { + let left = std::thread::Builder::new() + .name("gitoxide.join.left".into()) + .spawn_scoped(s, left) + .expect("valid name"); + let right = std::thread::Builder::new() + .name("gitoxide.join.right".into()) + .spawn_scoped(s, right) + .expect("valid name"); + (left.join().unwrap(), right.join().unwrap()) + }) +} + +/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. +/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with. +/// +/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not +/// connect each other with channels as deadlock would occur in single-threaded mode. +pub fn threads<'env, F, R>(f: F) -> R +where + F: for<'scope> FnOnce(&'scope std::thread::Scope<'scope, 'env>) -> R, +{ + std::thread::scope(f) +} + +/// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. +pub fn build_thread() -> std::thread::Builder { + std::thread::Builder::new() +} + +/// Read items from `input` and `consume` them in multiple threads, +/// whose output output is collected by a `reducer`. Its task is to +/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. +/// +/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially +/// created by `new_thread_state(…)`. +/// * For `reducer`, see the [`Reduce`] trait +pub fn in_parallel<I, S, O, R>( + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl Fn(usize) -> S + Send + Clone, + consume: impl Fn(I, &mut S) -> O + Send + Clone, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + let num_threads = num_threads(thread_limit); + std::thread::scope(move |s| { + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads); + for thread_id in 0..num_threads { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel.produce.{thread_id}")) + .spawn_scoped(s, { + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let consume = consume.clone(); + move || { + let mut state = new_thread_state(thread_id); + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + break; + } + } + } + }) + .expect("valid name"); + } + std::thread::Builder::new() + .name("gitoxide.in_parallel.feed".into()) + .spawn_scoped(s, move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + }) + .expect("valid name"); + receive_result + }; + + for item in receive_result { + drop(reducer.feed(item)?); + } + reducer.finalize() + }) +} + +/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. +/// This is only good for operations where near-random access isn't detrimental, so it's not usually great +/// for file-io as it won't make use of sorted inputs well. +/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +// TODO: better docs +pub fn in_parallel_with_slice<I, S, R, E>( + input: &mut [I], + thread_limit: Option<usize>, + new_thread_state: impl FnMut(usize) -> S + Send + Clone, + consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone, + mut periodic: impl FnMut() -> Option<std::time::Duration> + Send, + state_to_rval: impl FnOnce(S) -> R + Send + Clone, +) -> Result<Vec<R>, E> +where + I: Send, + E: Send, + R: Send, +{ + let num_threads = num_threads(thread_limit); + let mut results = Vec::with_capacity(num_threads); + let stop_everything = &AtomicBool::default(); + let index = &AtomicUsize::default(); + + // TODO: use std::thread::scope() once Rust 1.63 is available. + std::thread::scope({ + move |s| { + std::thread::Builder::new() + .name("gitoxide.in_parallel_with_slice.watch-interrupts".into()) + .spawn_scoped(s, { + move || loop { + if stop_everything.load(Ordering::Relaxed) { + break; + } + + match periodic() { + Some(duration) => std::thread::sleep(duration), + None => { + stop_everything.store(true, Ordering::Relaxed); + break; + } + } + } + }) + .expect("valid name"); + + let input_len = input.len(); + struct Input<I>(*mut [I]) + where + I: Send; + + // SAFETY: I is Send + Sync, so is a *mut [I] + #[allow(unsafe_code)] + unsafe impl<I> Send for Input<I> where I: Send {} + + let threads: Vec<_> = (0..num_threads) + .map(|thread_id| { + std::thread::Builder::new() + .name(format!("gitoxide.in_parallel_with_slice.produce.{thread_id}")) + .spawn_scoped(s, { + let mut new_thread_state = new_thread_state.clone(); + let state_to_rval = state_to_rval.clone(); + let mut consume = consume.clone(); + let input = Input(input as *mut [I]); + move || { + let mut state = new_thread_state(thread_id); + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); + } + } + Ok(state_to_rval(state)) + } + }) + .expect("valid name") + }) + .collect(); + for thread in threads { + match thread.join() { + Ok(res) => { + results.push(res?); + } + Err(err) => { + // a panic happened, stop the world gracefully (even though we panic later) + stop_everything.store(true, Ordering::Relaxed); + std::panic::resume_unwind(err); + } + } + } + + stop_everything.store(true, Ordering::Relaxed); + Ok(results) + } + }) +} diff --git a/vendor/gix-features/src/parallel/mod.rs b/vendor/gix-features/src/parallel/mod.rs new file mode 100644 index 000000000..c994cb3b8 --- /dev/null +++ b/vendor/gix-features/src/parallel/mod.rs @@ -0,0 +1,179 @@ +//! Run computations in parallel, or not based the `parallel` feature toggle. +//! +//! ### in_parallel(…) +//! +//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage +//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling +//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. +//! +//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail. +//! +//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself +//! or the data to work on. +//! +//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running +//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output +//! aggregation. +//! +//! ### `reduce::Stepwise` +//! +//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by +//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not +//! buffered, the owner of the iterator controls the progress made. +//! +//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which +//! is functionally equivalent to calling [`in_parallel()`]. +//! +//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping +//! the iterator will wind down the computation without any result. +//! +//! #### Maintaining Safety +//! +//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce +//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce +//! suitable input iterators as long as they can hold something on the heap. +#[cfg(feature = "parallel")] +mod in_parallel; +#[cfg(feature = "parallel")] +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; + +mod serial; +#[cfg(not(feature = "parallel"))] +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; + +mod in_order; +pub use in_order::{InOrderIter, SequenceId}; + +mod eager_iter; +pub use eager_iter::{EagerIter, EagerIterIf}; + +/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used +/// when the `parallel` feature toggle is not set. +#[cfg(not(feature = "parallel"))] +pub fn optimize_chunk_size_and_thread_limit( + desired_chunk_size: usize, + _num_items: Option<usize>, + thread_limit: Option<usize>, + _available_threads: Option<usize>, +) -> (usize, Option<usize>, usize) { + (desired_chunk_size, thread_limit, num_threads(thread_limit)) +} + +/// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given +/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`. +/// +/// * `desired_chunk_size` is the amount of items per chunk you think should be used. +/// * `num_items` is the total amount of items in the iteration, if `Some`. +/// Otherwise this knowledge will not affect the output of this function. +/// * `thread_limit` is the amount of threads to use at most, if `Some`. +/// Otherwise this knowledge will not affect the output of this function. +/// * `available_threads` is the total amount of threads available, if `Some`. +/// Otherwise the actual amount of available threads is determined by querying the system. +/// +/// `Note` that this implementation is available only if the `parallel` feature toggle is set. +#[cfg(feature = "parallel")] +pub fn optimize_chunk_size_and_thread_limit( + desired_chunk_size: usize, + num_items: Option<usize>, + thread_limit: Option<usize>, + available_threads: Option<usize>, +) -> (usize, Option<usize>, usize) { + let available_threads = + available_threads.unwrap_or_else(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); + let available_threads = thread_limit + .map(|l| if l == 0 { available_threads } else { l }) + .unwrap_or(available_threads); + + let (lower, upper) = (50, 1000); + let (chunk_size, thread_limit) = num_items + .map(|num_items| { + let desired_chunks_per_thread_at_least = 2; + let items = num_items; + let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); + let num_chunks = items / chunk_size; + let thread_limit = if num_chunks <= available_threads { + (num_chunks / desired_chunks_per_thread_at_least).max(1) + } else { + available_threads + }; + (chunk_size, thread_limit) + }) + .unwrap_or({ + let chunk_size = if available_threads == 1 { + desired_chunk_size + } else if desired_chunk_size < lower { + lower + } else { + desired_chunk_size.min(upper) + }; + (chunk_size, available_threads) + }); + (chunk_size, Some(thread_limit), thread_limit) +} + +/// Always returns 1, available when the `parallel` feature toggle is unset. +#[cfg(not(feature = "parallel"))] +pub fn num_threads(_thread_limit: Option<usize>) -> usize { + 1 +} + +/// Returns the amount of threads the system can effectively use as the amount of its logical cores. +/// +/// Only available with the `parallel` feature toggle set. +#[cfg(feature = "parallel")] +pub fn num_threads(thread_limit: Option<usize>) -> usize { + let logical_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + thread_limit + .map(|l| if l == 0 { logical_cores } else { l }) + .unwrap_or(logical_cores) +} + +/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. +/// +/// For parameters, see the documentation of [`in_parallel()`] +#[cfg(feature = "parallel")] +pub fn in_parallel_if<I, S, O, R>( + condition: impl FnOnce() -> bool, + input: impl Iterator<Item = I> + Send, + thread_limit: Option<usize>, + new_thread_state: impl Fn(usize) -> S + Send + Clone, + consume: impl Fn(I, &mut S) -> O + Send + Clone, + reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + if num_threads(thread_limit) > 1 && condition() { + in_parallel(input, thread_limit, new_thread_state, consume, reducer) + } else { + serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) + } +} + +/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. +/// +/// For parameters, see the documentation of [`in_parallel()`] +/// +/// Note that the non-parallel version is equivalent to [`in_parallel()`]. +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_if<I, S, O, R>( + _condition: impl FnOnce() -> bool, + input: impl Iterator<Item = I>, + thread_limit: Option<usize>, + new_thread_state: impl Fn(usize) -> S, + consume: impl Fn(I, &mut S) -> O, + reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, + I: Send, + O: Send, +{ + serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) +} + +/// +pub mod reduce; +pub use reduce::Reduce; diff --git a/vendor/gix-features/src/parallel/reduce.rs b/vendor/gix-features/src/parallel/reduce.rs new file mode 100644 index 000000000..f9992cfd2 --- /dev/null +++ b/vendor/gix-features/src/parallel/reduce.rs @@ -0,0 +1,279 @@ +#[cfg(feature = "parallel")] +mod stepped { + use crate::parallel::num_threads; + + /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] + /// for details. + pub struct Stepwise<Reduce: super::Reduce> { + /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops + /// as sending results fails when the receiver is dropped. + receive_result: std::sync::mpsc::Receiver<Reduce::Input>, + /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When + /// that happens, they break out of their loops. + threads: Vec<std::thread::JoinHandle<()>>, + /// The reducer is called only in the thread using the iterator, dropping it has no side effects. + reducer: Option<Reduce>, + } + + impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> { + fn drop(&mut self) { + let (_, sink) = std::sync::mpsc::channel(); + drop(std::mem::replace(&mut self.receive_result, sink)); + + let mut last_err = None; + for handle in std::mem::take(&mut self.threads) { + if let Err(err) = handle.join() { + last_err = Some(err); + }; + } + if let Some(thread_err) = last_err { + std::panic::resume_unwind(thread_err); + } + } + } + + impl<Reduce: super::Reduce> Stepwise<Reduce> { + /// Instantiate a new iterator and start working in threads. + /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. + pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>( + input: InputIter, + thread_limit: Option<usize>, + new_thread_state: ThreadStateFn, + consume: ConsumeFn, + reducer: Reduce, + ) -> Self + where + InputIter: Iterator<Item = I> + Send + 'static, + ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static, + ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static, + Reduce: super::Reduce<Input = O> + 'static, + I: Send + 'static, + O: Send + 'static, + { + let num_threads = num_threads(thread_limit); + let mut threads = Vec::with_capacity(num_threads + 1); + let receive_result = { + let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads); + let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads); + for thread_id in 0..num_threads { + let handle = std::thread::spawn({ + let send_result = send_result.clone(); + let receive_input = receive_input.clone(); + let new_thread_state = new_thread_state.clone(); + let consume = consume.clone(); + move || { + let mut state = new_thread_state(thread_id); + for item in receive_input { + if send_result.send(consume(item, &mut state)).is_err() { + break; + } + } + } + }); + threads.push(handle); + } + threads.push(std::thread::spawn(move || { + for item in input { + if send_input.send(item).is_err() { + break; + } + } + })); + receive_result + }; + Stepwise { + threads, + receive_result, + reducer: Some(reducer), + } + } + + /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. + pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { + for value in self.by_ref() { + drop(value?); + } + self.reducer + .take() + .expect("this is the last call before consumption") + .finalize() + } + } + + impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> { + type Item = Result<Reduce::FeedProduce, Reduce::Error>; + + fn next(&mut self) -> Option<<Self as Iterator>::Item> { + self.receive_result + .recv() + .ok() + .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input))) + } + } + + impl<R: super::Reduce> super::Finalize for Stepwise<R> { + type Reduce = R; + + fn finalize( + self, + ) -> Result< + <<Self as super::Finalize>::Reduce as super::Reduce>::Output, + <<Self as super::Finalize>::Reduce as super::Reduce>::Error, + > { + Stepwise::finalize(self) + } + } +} + +#[cfg(not(feature = "parallel"))] +mod stepped { + /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel] + /// for details. + pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> { + input: InputIter, + consume: ConsumeFn, + thread_state: ThreadState, + reducer: Reduce, + } + + impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut S) -> O, + Reduce: super::Reduce<Input = O>, + { + /// Instantiate a new iterator. + /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()]. + pub fn new<ThreadStateFn>( + input: InputIter, + _thread_limit: Option<usize>, + new_thread_state: ThreadStateFn, + consume: ConsumeFn, + reducer: Reduce, + ) -> Self + where + ThreadStateFn: Fn(usize) -> S, + { + Stepwise { + input, + consume, + thread_state: new_thread_state(0), + reducer, + } + } + + /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()]. + pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> { + for value in self.by_ref() { + drop(value?); + } + self.reducer.finalize() + } + } + + impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut ThreadState) -> O, + Reduce: super::Reduce<Input = O>, + { + type Item = Result<Reduce::FeedProduce, Reduce::Error>; + + fn next(&mut self) -> Option<<Self as Iterator>::Item> { + self.input + .next() + .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state))) + } + } + + impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R> + where + InputIter: Iterator<Item = I>, + ConsumeFn: Fn(I, &mut S) -> O, + R: super::Reduce<Input = O>, + { + type Reduce = R; + + fn finalize( + self, + ) -> Result< + <<Self as super::Finalize>::Reduce as super::Reduce>::Output, + <<Self as super::Finalize>::Reduce as super::Reduce>::Error, + > { + Stepwise::finalize(self) + } + } +} + +use std::marker::PhantomData; + +pub use stepped::Stepwise; + +/// An trait for aggregating items commonly produced in threads into a single result, without itself +/// needing to be thread safe. +pub trait Reduce { + /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method. + /// + /// It's produced by a function that may run on multiple threads. + type Input; + /// The type produced in Ok(…) by [`feed()`][Reduce::feed()]. + /// Most reducers by nature use `()` here as the value is in the aggregation. + /// However, some may use it to collect statistics only and return their Input + /// in some form as a result here for [`Stepwise`] to be useful. + type FeedProduce; + /// The type produced once by the [`finalize()`][Reduce::finalize()] method. + /// + /// For traditional reducers, this is the value produced by the entire operation. + /// For those made for step-wise iteration this may be aggregated statistics. + type Output; + /// The error type to use for all methods of this trait. + type Error; + /// Called each time a new `item` was produced in order to aggregate it into the final result. + /// + /// If an `Error` is returned, the entire operation will be stopped. + fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>; + /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`. + fn finalize(self) -> Result<Self::Output, Self::Error>; +} + +/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()] +/// without the use of non-threaded reduction of products created in threads. +pub struct IdentityWithResult<Input, Error> { + _input: PhantomData<Input>, + _error: PhantomData<Error>, +} + +impl<Input, Error> Default for IdentityWithResult<Input, Error> { + fn default() -> Self { + IdentityWithResult { + _input: Default::default(), + _error: Default::default(), + } + } +} + +impl<Input, Error> Reduce for IdentityWithResult<Input, Error> { + type Input = Result<Input, Self::Error>; + type FeedProduce = Input; + type Output = (); + type Error = Error; + + fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> { + item + } + + fn finalize(self) -> Result<Self::Output, Self::Error> { + Ok(()) + } +} + +/// A trait reflecting the `finalize()` method of [`Reduce`] implementations +pub trait Finalize { + /// An implementation of [`Reduce`] + type Reduce: self::Reduce; + + /// Similar to the [`Reduce::finalize()`] method + fn finalize( + self, + ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>; +} diff --git a/vendor/gix-features/src/parallel/serial.rs b/vendor/gix-features/src/parallel/serial.rs new file mode 100644 index 000000000..00723b2c3 --- /dev/null +++ b/vendor/gix-features/src/parallel/serial.rs @@ -0,0 +1,135 @@ +use crate::parallel::Reduce; + +#[cfg(not(feature = "parallel"))] +mod not_parallel { + /// Runs `left` and then `right`, one after another, returning their output when both are done. + pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { + (left(), right()) + } + + /// A scope for spawning threads. + pub struct Scope<'env> { + _marker: std::marker::PhantomData<&'env mut &'env ()>, + } + + pub struct ThreadBuilder; + + /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. + pub fn build_thread() -> ThreadBuilder { + ThreadBuilder + } + + #[allow(unsafe_code)] + unsafe impl Sync for Scope<'_> {} + + impl ThreadBuilder { + pub fn name(self, _new: String) -> Self { + self + } + pub fn spawn_scoped<'a, 'env, F, T>( + &self, + scope: &'a Scope<'env>, + f: F, + ) -> std::io::Result<ScopedJoinHandle<'a, T>> + where + F: FnOnce() -> T, + F: Send + 'env, + T: Send + 'env, + { + Ok(scope.spawn(f)) + } + } + + impl<'env> Scope<'env> { + pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T, + F: Send + 'env, + T: Send + 'env, + { + ScopedJoinHandle { + result: f(), + _marker: Default::default(), + } + } + } + + /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. + /// Note that this implementation will run the spawned functions immediately. + pub fn threads<'env, F, R>(f: F) -> R + where + F: FnOnce(&Scope<'env>) -> R, + { + f(&Scope { + _marker: Default::default(), + }) + } + + /// A handle that can be used to join its scoped thread. + /// + /// This struct is created by the [`Scope::spawn`] method and the + /// [`ScopedThreadBuilder::spawn`] method. + pub struct ScopedJoinHandle<'scope, T> { + /// Holds the result of the inner closure. + result: T, + _marker: std::marker::PhantomData<&'scope mut &'scope ()>, + } + + impl<T> ScopedJoinHandle<'_, T> { + pub fn join(self) -> std::thread::Result<T> { + Ok(self.result) + } + } + + /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. + /// This is only good for operations where near-random access isn't detrimental, so it's not usually great + /// for file-io as it won't make use of sorted inputs well. + // TODO: better docs + pub fn in_parallel_with_slice<I, S, R, E>( + input: &mut [I], + _thread_limit: Option<usize>, + mut new_thread_state: impl FnMut(usize) -> S + Clone, + mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut periodic: impl FnMut() -> Option<std::time::Duration>, + state_to_rval: impl FnOnce(S) -> R + Clone, + ) -> Result<Vec<R>, E> { + let mut state = new_thread_state(0); + for item in input { + consume(item, &mut state)?; + if periodic().is_none() { + break; + } + } + Ok(vec![state_to_rval(state)]) + } +} + +#[cfg(not(feature = "parallel"))] +pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle}; + +/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`, +/// whose task is to aggregate these outputs into the final result returned by this function. +/// +/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` +/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state. +/// * For `reducer`, see the [`Reduce`] trait +/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature +/// similar to the parallel version. +/// +/// **This serial version performing all calculations on the current thread.** +pub fn in_parallel<I, S, O, R>( + input: impl Iterator<Item = I>, + _thread_limit: Option<usize>, + new_thread_state: impl Fn(usize) -> S, + consume: impl Fn(I, &mut S) -> O, + mut reducer: R, +) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> +where + R: Reduce<Input = O>, +{ + let mut state = new_thread_state(0); + for item in input { + drop(reducer.feed(consume(item, &mut state))?); + } + reducer.finalize() +} diff --git a/vendor/gix-features/src/progress.rs b/vendor/gix-features/src/progress.rs new file mode 100644 index 000000000..b6e56895b --- /dev/null +++ b/vendor/gix-features/src/progress.rs @@ -0,0 +1,139 @@ +//! Various `prodash` types along with various utilities for comfort. +use std::io; + +pub use prodash::{ + self, + messages::MessageLevel, + progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN}, + unit, Progress, Unit, +}; + +#[cfg(feature = "progress-unit-bytes")] +pub use bytesize; +/// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`. +#[cfg(not(feature = "progress-unit-bytes"))] +pub mod bytesize { + /// A stub for the `ByteSize` wrapper. + pub struct ByteSize(pub u64); + + impl std::fmt::Display for ByteSize { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } +} + +/// A unit for displaying bytes with throughput and progress percentage. +#[cfg(feature = "progress-unit-bytes")] +pub fn bytes() -> Option<Unit> { + Some(unit::dynamic_and_mode( + unit::Bytes, + unit::display::Mode::with_throughput().and_percentage(), + )) +} + +/// A unit for displaying bytes with throughput and progress percentage. +#[cfg(not(feature = "progress-unit-bytes"))] +pub fn bytes() -> Option<Unit> { + Some(unit::label_and_mode( + "B", + unit::display::Mode::with_throughput().and_percentage(), + )) +} + +/// A unit for displaying human readable numbers with throughput and progress percentage, and a single decimal place. +pub fn count(name: &'static str) -> Option<Unit> { + count_with_decimals(name, 1) +} + +/// A unit for displaying human readable numbers with `name` suffix, +/// with throughput and progress percentage, and `decimals` decimal places. +#[cfg(feature = "progress-unit-human-numbers")] +pub fn count_with_decimals(name: &'static str, decimals: usize) -> Option<Unit> { + Some(unit::dynamic_and_mode( + unit::Human::new( + { + let mut f = unit::human::Formatter::new(); + f.with_decimals(decimals); + f + }, + name, + ), + unit::display::Mode::with_throughput().and_percentage(), + )) +} + +/// A unit for displaying human readable numbers with `name` suffix, +/// with throughput and progress percentage, and `decimals` decimal places. +#[cfg(not(feature = "progress-unit-human-numbers"))] +pub fn count_with_decimals(name: &'static str, _decimals: usize) -> Option<Unit> { + Some(unit::label_and_mode( + name, + unit::display::Mode::with_throughput().and_percentage(), + )) +} + +/// A predefined unit for displaying a multi-step progress +pub fn steps() -> Option<Unit> { + Some(unit::dynamic(unit::Range::new("steps"))) +} + +/// A structure passing every [`read`][std::io::Read::read()] call through to the contained Progress instance using [`inc_by(bytes_read)`][Progress::inc_by()]. +pub struct Read<T, P> { + /// The implementor of [`std::io::Read`] to which progress is added + pub inner: T, + /// The progress instance receiving progress information on each invocation of `reader` + pub progress: P, +} + +impl<T, P> io::Read for Read<T, P> +where + T: io::Read, + P: Progress, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let bytes_read = self.inner.read(buf)?; + self.progress.inc_by(bytes_read); + Ok(bytes_read) + } +} + +impl<T, P> io::BufRead for Read<T, P> +where + T: io::BufRead, + P: Progress, +{ + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.inner.fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.inner.consume(amt) + } +} + +/// A structure passing every [`write`][std::io::Write::write()] call through to the contained Progress instance using [`inc_by(bytes_written)`][Progress::inc_by()]. +/// +/// This is particularly useful if the final size of the bytes to write is known or can be estimated precisely enough. +pub struct Write<T, P> { + /// The implementor of [`std::io::Write`] to which progress is added + pub inner: T, + /// The progress instance receiving progress information on each invocation of `reader` + pub progress: P, +} + +impl<T, P> io::Write for Write<T, P> +where + T: io::Write, + P: Progress, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let written = self.inner.write(buf)?; + self.progress.inc_by(written); + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} diff --git a/vendor/gix-features/src/threading.rs b/vendor/gix-features/src/threading.rs new file mode 100644 index 000000000..ff0c819a5 --- /dev/null +++ b/vendor/gix-features/src/threading.rs @@ -0,0 +1,100 @@ +//! Type definitions for putting shared ownership and synchronized mutation behind the `threading` feature toggle. +//! +//! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature. + +#[cfg(feature = "parallel")] +mod _impl { + use std::sync::Arc; + + /// A thread-safe cell which can be written to only once. + #[cfg(feature = "once_cell")] + pub type OnceCell<T> = once_cell::sync::OnceCell<T>; + /// A reference counted pointer type for shared ownership. + pub type OwnShared<T> = Arc<T>; + /// A synchronization primitive which can start read-only and transition to support mutation. + pub type MutableOnDemand<T> = parking_lot::RwLock<T>; + /// A synchronization primitive which provides read-write access right away. + pub type Mutable<T> = parking_lot::Mutex<T>; + /// A guarded reference suitable for safekeeping in a struct. + pub type RefGuard<'a, T> = parking_lot::RwLockReadGuard<'a, T>; + /// A mapped reference created from a RefGuard + pub type MappedRefGuard<'a, U> = parking_lot::MappedRwLockReadGuard<'a, U>; + + /// Get a shared reference through a [`MutableOnDemand`] for read-only access. + pub fn get_ref<T>(v: &MutableOnDemand<T>) -> RefGuard<'_, T> { + v.read() + } + + /// Get a mutable reference through a [`MutableOnDemand`] for read-write access. + pub fn get_mut<T>(v: &MutableOnDemand<T>) -> parking_lot::RwLockWriteGuard<'_, T> { + v.write() + } + + /// Get a mutable reference through a [`Mutable`] for read-write access. + pub fn lock<T>(v: &Mutable<T>) -> parking_lot::MutexGuard<'_, T> { + v.lock() + } + + /// Downgrade a handle previously obtained with [`get_mut()`] to drop mutation support. + pub fn downgrade_mut_to_ref<'a, T>( + v: parking_lot::RwLockWriteGuard<'a, T>, + _orig: &'a MutableOnDemand<T>, + ) -> RefGuard<'a, T> { + parking_lot::RwLockWriteGuard::downgrade(v) + } + + /// Map a read guard into a sub-type it contains. + pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> { + parking_lot::RwLockReadGuard::map(v, f) + } +} + +#[cfg(not(feature = "parallel"))] +mod _impl { + use std::{ + cell::{Ref, RefCell, RefMut}, + rc::Rc, + }; + + /// A thread-safe cell which can be written to only once. + #[cfg(feature = "once_cell")] + pub type OnceCell<T> = once_cell::unsync::OnceCell<T>; + /// A reference counted pointer type for shared ownership. + pub type OwnShared<T> = Rc<T>; + /// A synchronization primitive which can start read-only and transition to support mutation. + pub type MutableOnDemand<T> = RefCell<T>; + /// A synchronization primitive which provides read-write access right away. + pub type Mutable<T> = RefCell<T>; + /// A guarded reference suitable for safekeeping in a struct. + pub type RefGuard<'a, T> = Ref<'a, T>; + /// A mapped reference created from a RefGuard + pub type MappedRefGuard<'a, U> = Ref<'a, U>; + + /// Get a shared reference through a [`MutableOnDemand`] for read-only access. + pub fn get_mut<T>(v: &RefCell<T>) -> RefMut<'_, T> { + v.borrow_mut() + } + + /// Get a mutable reference through a [`Mutable`] for read-write access. + pub fn lock<T>(v: &Mutable<T>) -> RefMut<'_, T> { + v.borrow_mut() + } + + /// Get a mutable reference through a [`MutableOnDemand`] for read-write access. + pub fn get_ref<T>(v: &RefCell<T>) -> RefGuard<'_, T> { + v.borrow() + } + + /// Downgrade a handle previously obtained with [`upgrade_ref_to_mut()`] to drop mutation support. + pub fn downgrade_mut_to_ref<'a, T>(v: RefMut<'a, T>, orig: &'a RefCell<T>) -> RefGuard<'a, T> { + drop(v); + orig.borrow() + } + + /// Map a read guard into a sub-type it contains. + pub fn map_ref<T, U: ?Sized>(v: RefGuard<'_, T>, f: impl FnOnce(&T) -> &U) -> MappedRefGuard<'_, U> { + Ref::map(v, f) + } +} + +pub use _impl::*; diff --git a/vendor/gix-features/src/zlib/mod.rs b/vendor/gix-features/src/zlib/mod.rs new file mode 100644 index 000000000..8dcdfd93f --- /dev/null +++ b/vendor/gix-features/src/zlib/mod.rs @@ -0,0 +1,47 @@ +pub use flate2::{Decompress, Status}; + +/// non-streaming interfaces for decompression +pub mod inflate { + /// The error returned by various [Inflate methods][super::Inflate] + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Could not write all bytes when decompressing content")] + WriteInflated(#[from] std::io::Error), + #[error("Could not decode zip stream, status was '{0:?}'")] + Inflate(#[from] flate2::DecompressError), + #[error("The zlib status indicated an error, status was '{0:?}'")] + Status(flate2::Status), + } +} + +/// Decompress a few bytes of a zlib stream without allocation +pub struct Inflate { + /// The actual decompressor doing all the work. + pub state: Decompress, +} + +impl Default for Inflate { + fn default() -> Self { + Inflate { + state: Decompress::new(true), + } + } +} + +impl Inflate { + /// Run the decompressor exactly once. Cannot be run multiple times + pub fn once(&mut self, input: &[u8], out: &mut [u8]) -> Result<(flate2::Status, usize, usize), inflate::Error> { + let before_in = self.state.total_in(); + let before_out = self.state.total_out(); + let status = self.state.decompress(input, out, flate2::FlushDecompress::None)?; + Ok(( + status, + (self.state.total_in() - before_in) as usize, + (self.state.total_out() - before_out) as usize, + )) + } +} + +/// +pub mod stream; diff --git a/vendor/gix-features/src/zlib/stream/deflate/mod.rs b/vendor/gix-features/src/zlib/stream/deflate/mod.rs new file mode 100644 index 000000000..55f575ea4 --- /dev/null +++ b/vendor/gix-features/src/zlib/stream/deflate/mod.rs @@ -0,0 +1,96 @@ +use flate2::Compress; + +const BUF_SIZE: usize = 4096 * 8; + +/// A utility to zlib compress anything that is written via its [Write][std::io::Write] implementation. +/// +/// Be sure to call `flush()` when done to finalize the deflate stream. +pub struct Write<W> { + compressor: Compress, + inner: W, + buf: [u8; BUF_SIZE], +} + +mod impls { + use std::io; + + use flate2::{Compress, Compression, FlushCompress, Status}; + + use crate::zlib::stream::deflate; + + impl<W> deflate::Write<W> + where + W: io::Write, + { + /// Create a new instance writing compressed bytes to `inner`. + pub fn new(inner: W) -> deflate::Write<W> { + deflate::Write { + compressor: Compress::new(Compression::fast(), true), + inner, + buf: [0; deflate::BUF_SIZE], + } + } + + /// Reset the compressor, starting a new compression stream. + /// + /// That way multiple streams can be written to the same inner writer. + pub fn reset(&mut self) { + self.compressor.reset(); + } + + /// Consume `self` and return the inner writer. + pub fn into_inner(self) -> W { + self.inner + } + + fn write_inner(&mut self, mut buf: &[u8], flush: FlushCompress) -> io::Result<usize> { + let total_in_when_start = self.compressor.total_in(); + loop { + let last_total_in = self.compressor.total_in(); + let last_total_out = self.compressor.total_out(); + + let status = self + .compressor + .compress(buf, &mut self.buf, flush) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + let written = self.compressor.total_out() - last_total_out; + if written > 0 { + self.inner.write_all(&self.buf[..written as usize])?; + } + + match status { + Status::StreamEnd => return Ok((self.compressor.total_in() - total_in_when_start) as usize), + Status::Ok | Status::BufError => { + let consumed = self.compressor.total_in() - last_total_in; + buf = &buf[consumed as usize..]; + + // output buffer still makes progress + if self.compressor.total_out() > last_total_out { + continue; + } + // input still makes progress + if self.compressor.total_in() > last_total_in { + continue; + } + // input also makes no progress anymore, need more so leave with what we have + return Ok((self.compressor.total_in() - total_in_when_start) as usize); + } + } + } + } + } + + impl<W: io::Write> io::Write for deflate::Write<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.write_inner(buf, FlushCompress::None) + } + + fn flush(&mut self) -> io::Result<()> { + self.write_inner(&[], FlushCompress::Finish).map(|_| ()) + } + } +} + +#[cfg(test)] +mod tests; diff --git a/vendor/gix-features/src/zlib/stream/deflate/tests.rs b/vendor/gix-features/src/zlib/stream/deflate/tests.rs new file mode 100644 index 000000000..ba0dd2a2c --- /dev/null +++ b/vendor/gix-features/src/zlib/stream/deflate/tests.rs @@ -0,0 +1,101 @@ +mod deflate_stream { + use std::{ + io, + io::{Read, Write}, + }; + + use bstr::ByteSlice; + use flate2::Decompress; + + use crate::zlib::stream::deflate; + + /// Provide streaming decompression using the `std::io::Read` trait. + /// If `std::io::BufReader` is used, an allocation for the input buffer will be performed. + struct InflateReader<R> { + inner: R, + decompressor: Decompress, + } + + impl<R> InflateReader<R> + where + R: io::BufRead, + { + pub fn from_read(read: R) -> InflateReader<R> { + InflateReader { + decompressor: Decompress::new(true), + inner: read, + } + } + } + + impl<R> io::Read for InflateReader<R> + where + R: io::BufRead, + { + fn read(&mut self, into: &mut [u8]) -> io::Result<usize> { + crate::zlib::stream::inflate::read(&mut self.inner, &mut self.decompressor, into) + } + } + + #[test] + fn small_file_decompress() -> Result<(), Box<dyn std::error::Error>> { + fn fixture_path(path: &str) -> std::path::PathBuf { + std::path::PathBuf::from("tests/fixtures").join(path) + } + let r = InflateReader::from_read(io::BufReader::new(std::fs::File::open(fixture_path( + "objects/37/d4e6c5c48ba0d245164c4e10d5f41140cab980", + ))?)); + let mut bytes = r.bytes(); + let content = bytes.by_ref().take(16).collect::<Result<Vec<_>, _>>()?; + assert_eq!(content.as_slice().as_bstr(), b"blob 9\0hi there\n".as_bstr()); + assert!(bytes.next().is_none()); + Ok(()) + } + + #[test] + fn all_at_once() -> Result<(), Box<dyn std::error::Error>> { + let mut w = deflate::Write::new(Vec::new()); + assert_eq!(w.write(b"hello")?, 5); + w.flush()?; + + let out = w.inner; + assert!(out.len() == 12 || out.len() == 13); + + assert_deflate_buffer(out, b"hello") + } + + fn assert_deflate_buffer(out: Vec<u8>, expected: &[u8]) -> Result<(), Box<dyn std::error::Error>> { + let mut actual = Vec::new(); + InflateReader::from_read(out.as_slice()).read_to_end(&mut actual)?; + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn big_file_small_writes() -> Result<(), Box<dyn std::error::Error>> { + let mut w = deflate::Write::new(Vec::new()); + let bytes = include_bytes!( + "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" + ); + for chunk in bytes.chunks(2) { + assert_eq!(w.write(chunk)?, chunk.len()); + } + w.flush()?; + + assert_deflate_buffer(w.inner, bytes) + } + + #[test] + fn big_file_a_few_big_writes() -> Result<(), Box<dyn std::error::Error>> { + let mut w = deflate::Write::new(Vec::new()); + let bytes = include_bytes!( + "../../../../tests/fixtures/objects/pack/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack" + ); + for chunk in bytes.chunks(4096 * 9) { + assert_eq!(w.write(chunk)?, chunk.len()); + } + w.flush()?; + + assert_deflate_buffer(w.inner, bytes) + } +} diff --git a/vendor/gix-features/src/zlib/stream/inflate.rs b/vendor/gix-features/src/zlib/stream/inflate.rs new file mode 100644 index 000000000..007ecedc6 --- /dev/null +++ b/vendor/gix-features/src/zlib/stream/inflate.rs @@ -0,0 +1,57 @@ +use std::{io, io::BufRead}; + +use flate2::{Decompress, FlushDecompress, Status}; + +/// The boxed variant is faster for what we do (moving the decompressor in and out a lot) +pub struct ReadBoxed<R> { + /// The reader from which bytes should be decompressed. + pub inner: R, + /// The decompressor doing all the work. + pub decompressor: Box<Decompress>, +} + +impl<R> io::Read for ReadBoxed<R> +where + R: BufRead, +{ + fn read(&mut self, into: &mut [u8]) -> io::Result<usize> { + read(&mut self.inner, &mut self.decompressor, into) + } +} + +/// Read bytes from `rd` and decompress them using `state` into a pre-allocated fitting buffer `dst`, returning the amount of bytes written. +pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -> io::Result<usize> { + let mut total_written = 0; + loop { + let (written, consumed, ret, eof); + { + let input = rd.fill_buf()?; + eof = input.is_empty(); + let before_out = state.total_out(); + let before_in = state.total_in(); + let flush = if eof { + FlushDecompress::Finish + } else { + FlushDecompress::None + }; + ret = state.decompress(input, dst, flush); + written = (state.total_out() - before_out) as usize; + total_written += written; + dst = &mut dst[written..]; + consumed = (state.total_in() - before_in) as usize; + } + rd.consume(consumed); + + match ret { + // The stream has officially ended, nothing more to do here. + Ok(Status::StreamEnd) => return Ok(total_written), + // Either input our output are depleted even though the stream is not depleted yet. + Ok(Status::Ok) | Ok(Status::BufError) if eof || dst.is_empty() => return Ok(total_written), + // Some progress was made in both the input and the output, it must continue to reach the end. + Ok(Status::Ok) | Ok(Status::BufError) if consumed != 0 || written != 0 => continue, + // A strange state, where zlib makes no progress but isn't done either. Call it out. + Ok(Status::Ok) | Ok(Status::BufError) => unreachable!("Definitely a bug somewhere"), + Err(..) => return Err(io::Error::new(io::ErrorKind::InvalidInput, "corrupt deflate stream")), + } + } +} diff --git a/vendor/gix-features/src/zlib/stream/mod.rs b/vendor/gix-features/src/zlib/stream/mod.rs new file mode 100644 index 000000000..7fb239d36 --- /dev/null +++ b/vendor/gix-features/src/zlib/stream/mod.rs @@ -0,0 +1,4 @@ +/// +pub mod deflate; +/// +pub mod inflate; |