diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:35 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:35 +0000 |
commit | 7e5d7eea9c580ef4b41a765bde624af431942b96 (patch) | |
tree | 2c0d9ca12878fc4525650aa4e54d77a81a07cc09 /vendor/gix-pack/src/index | |
parent | Adding debian version 1.70.0+dfsg1-9. (diff) | |
download | rustc-7e5d7eea9c580ef4b41a765bde624af431942b96.tar.xz rustc-7e5d7eea9c580ef4b41a765bde624af431942b96.zip |
Merging upstream version 1.70.0+dfsg2.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-pack/src/index')
-rw-r--r-- | vendor/gix-pack/src/index/access.rs | 290 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/init.rs | 91 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/mod.rs | 155 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/error.rs | 44 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/mod.rs | 245 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/reduce.rs | 129 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/types.rs | 123 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/with_index.rs | 230 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/traverse/with_lookup.rs | 190 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/util.rs | 47 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/verify.rs | 290 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/write/encode.rs | 127 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/write/error.rs | 25 | ||||
-rw-r--r-- | vendor/gix-pack/src/index/write/mod.rs | 263 |
14 files changed, 2249 insertions, 0 deletions
diff --git a/vendor/gix-pack/src/index/access.rs b/vendor/gix-pack/src/index/access.rs new file mode 100644 index 000000000..0ac85dff7 --- /dev/null +++ b/vendor/gix-pack/src/index/access.rs @@ -0,0 +1,290 @@ +use std::{mem::size_of, ops::Range}; + +use crate::{ + data, + index::{self, EntryIndex, PrefixLookupResult, FAN_LEN}, +}; + +const N32_SIZE: usize = size_of::<u32>(); +const N64_SIZE: usize = size_of::<u64>(); +const V1_HEADER_SIZE: usize = FAN_LEN * N32_SIZE; +const V2_HEADER_SIZE: usize = N32_SIZE * 2 + FAN_LEN * N32_SIZE; +const N32_HIGH_BIT: u32 = 1 << 31; + +/// Represents an entry within a pack index file, effectively mapping object [`IDs`][gix_hash::ObjectId] to pack data file locations. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub struct Entry { + /// The ID of the object + pub oid: gix_hash::ObjectId, + /// The offset to the object's header in the pack data file + pub pack_offset: data::Offset, + /// The CRC32 hash over all bytes of the pack data entry. + /// + /// This can be useful for direct copies of pack data entries from one pack to another with insurance there was no bit rot. + /// _Note_: Only available in index version 2 or newer + pub crc32: Option<u32>, +} + +/// Iteration and access +impl index::File { + fn iter_v1(&self) -> impl Iterator<Item = Entry> + '_ { + match self.version { + index::Version::V1 => self.data[V1_HEADER_SIZE..] + .chunks(N32_SIZE + self.hash_len) + .take(self.num_objects as usize) + .map(|c| { + let (ofs, oid) = c.split_at(N32_SIZE); + Entry { + oid: gix_hash::ObjectId::from(oid), + pack_offset: crate::read_u32(ofs) as u64, + crc32: None, + } + }), + _ => panic!("Cannot use iter_v1() on index of type {:?}", self.version), + } + } + + fn iter_v2(&self) -> impl Iterator<Item = Entry> + '_ { + let pack64_offset = self.offset_pack_offset64_v2(); + match self.version { + index::Version::V2 => izip!( + self.data[V2_HEADER_SIZE..].chunks(self.hash_len), + self.data[self.offset_crc32_v2()..].chunks(N32_SIZE), + self.data[self.offset_pack_offset_v2()..].chunks(N32_SIZE) + ) + .take(self.num_objects as usize) + .map(move |(oid, crc32, ofs32)| Entry { + oid: gix_hash::ObjectId::from(oid), + pack_offset: self.pack_offset_from_offset_v2(ofs32, pack64_offset), + crc32: Some(crate::read_u32(crc32)), + }), + _ => panic!("Cannot use iter_v2() on index of type {:?}", self.version), + } + } + + /// Returns the object hash at the given index in our list of (sorted) sha1 hashes. + /// The index ranges from 0 to self.num_objects() + /// + /// # Panics + /// + /// If `index` is out of bounds. + pub fn oid_at_index(&self, index: EntryIndex) -> &gix_hash::oid { + let index = index as usize; + let start = match self.version { + index::Version::V2 => V2_HEADER_SIZE + index * self.hash_len, + index::Version::V1 => V1_HEADER_SIZE + index * (N32_SIZE + self.hash_len) + N32_SIZE, + }; + gix_hash::oid::from_bytes_unchecked(&self.data[start..][..self.hash_len]) + } + + /// Returns the offset into our pack data file at which to start reading the object at `index`. + /// + /// # Panics + /// + /// If `index` is out of bounds. + pub fn pack_offset_at_index(&self, index: EntryIndex) -> data::Offset { + let index = index as usize; + match self.version { + index::Version::V2 => { + let start = self.offset_pack_offset_v2() + index * N32_SIZE; + self.pack_offset_from_offset_v2(&self.data[start..][..N32_SIZE], self.offset_pack_offset64_v2()) + } + index::Version::V1 => { + let start = V1_HEADER_SIZE + index * (N32_SIZE + self.hash_len); + crate::read_u32(&self.data[start..][..N32_SIZE]) as u64 + } + } + } + + /// Returns the CRC32 of the object at the given `index`. + /// + /// _Note_: These are always present for index version 2 or higher. + /// # Panics + /// + /// If `index` is out of bounds. + pub fn crc32_at_index(&self, index: EntryIndex) -> Option<u32> { + let index = index as usize; + match self.version { + index::Version::V2 => { + let start = self.offset_crc32_v2() + index * N32_SIZE; + Some(crate::read_u32(&self.data[start..start + N32_SIZE])) + } + index::Version::V1 => None, + } + } + + /// Returns the `index` of the given hash for use with the [`oid_at_index()`][index::File::oid_at_index()], + /// [`pack_offset_at_index()`][index::File::pack_offset_at_index()] or [`crc32_at_index()`][index::File::crc32_at_index()]. + // NOTE: pretty much the same things as in `multi_index::File::lookup`, change things there + // as well. + pub fn lookup(&self, id: impl AsRef<gix_hash::oid>) -> Option<EntryIndex> { + lookup(id, &self.fan, |idx| self.oid_at_index(idx)) + } + + /// Given a `prefix`, find an object that matches it uniquely within this index and return `Some(Ok(entry_index))`. + /// If there is more than one object matching the object `Some(Err(())` is returned. + /// + /// Finally, if no object matches the index, the return value is `None`. + /// + /// Pass `candidates` to obtain the set of entry-indices matching `prefix`, with the same return value as + /// one would have received if it remained `None`. It will be empty if no object matched the `prefix`. + /// + // NOTE: pretty much the same things as in `index::File::lookup`, change things there + // as well. + pub fn lookup_prefix( + &self, + prefix: gix_hash::Prefix, + candidates: Option<&mut Range<EntryIndex>>, + ) -> Option<PrefixLookupResult> { + lookup_prefix( + prefix, + candidates, + &self.fan, + |idx| self.oid_at_index(idx), + self.num_objects, + ) + } + + /// An iterator over all [`Entries`][Entry] of this index file. + pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Entry> + 'a> { + match self.version { + index::Version::V2 => Box::new(self.iter_v2()), + index::Version::V1 => Box::new(self.iter_v1()), + } + } + + /// Return a vector of ascending offsets into our respective pack data file. + /// + /// Useful to control an iteration over all pack entries in a cache-friendly way. + pub fn sorted_offsets(&self) -> Vec<data::Offset> { + let mut ofs: Vec<_> = match self.version { + index::Version::V1 => self.iter().map(|e| e.pack_offset).collect(), + index::Version::V2 => { + let offset32_start = &self.data[self.offset_pack_offset_v2()..]; + let pack_offset_64_start = self.offset_pack_offset64_v2(); + offset32_start + .chunks(N32_SIZE) + .take(self.num_objects as usize) + .map(|offset| self.pack_offset_from_offset_v2(offset, pack_offset_64_start)) + .collect() + } + }; + ofs.sort_unstable(); + ofs + } + + #[inline] + fn offset_crc32_v2(&self) -> usize { + V2_HEADER_SIZE + self.num_objects as usize * self.hash_len + } + + #[inline] + fn offset_pack_offset_v2(&self) -> usize { + self.offset_crc32_v2() + self.num_objects as usize * N32_SIZE + } + + #[inline] + fn offset_pack_offset64_v2(&self) -> usize { + self.offset_pack_offset_v2() + self.num_objects as usize * N32_SIZE + } + + #[inline] + fn pack_offset_from_offset_v2(&self, offset: &[u8], pack64_offset: usize) -> data::Offset { + debug_assert_eq!(self.version, index::Version::V2); + let ofs32 = crate::read_u32(offset); + if (ofs32 & N32_HIGH_BIT) == N32_HIGH_BIT { + let from = pack64_offset + (ofs32 ^ N32_HIGH_BIT) as usize * N64_SIZE; + crate::read_u64(&self.data[from..][..N64_SIZE]) + } else { + ofs32 as u64 + } + } +} + +pub(crate) fn lookup_prefix<'a>( + prefix: gix_hash::Prefix, + candidates: Option<&mut Range<EntryIndex>>, + fan: &[u32; FAN_LEN], + oid_at_index: impl Fn(EntryIndex) -> &'a gix_hash::oid, + num_objects: u32, +) -> Option<PrefixLookupResult> { + let first_byte = prefix.as_oid().first_byte() as usize; + let mut upper_bound = fan[first_byte]; + let mut lower_bound = if first_byte != 0 { fan[first_byte - 1] } else { 0 }; + + // Bisect using indices + while lower_bound < upper_bound { + let mid = (lower_bound + upper_bound) / 2; + let mid_sha = oid_at_index(mid); + + use std::cmp::Ordering::*; + match prefix.cmp_oid(mid_sha) { + Less => upper_bound = mid, + Equal => match candidates { + Some(candidates) => { + let first_past_entry = ((0..mid).rev()) + .take_while(|prev| prefix.cmp_oid(oid_at_index(*prev)) == Equal) + .last(); + + let last_future_entry = ((mid + 1)..num_objects) + .take_while(|next| prefix.cmp_oid(oid_at_index(*next)) == Equal) + .last(); + + *candidates = match (first_past_entry, last_future_entry) { + (Some(first), Some(last)) => first..last + 1, + (Some(first), None) => first..mid + 1, + (None, Some(last)) => mid..last + 1, + (None, None) => mid..mid + 1, + }; + + return if candidates.len() > 1 { + Some(Err(())) + } else { + Some(Ok(mid)) + }; + } + None => { + let next = mid + 1; + if next < num_objects && prefix.cmp_oid(oid_at_index(next)) == Equal { + return Some(Err(())); + } + if mid != 0 && prefix.cmp_oid(oid_at_index(mid - 1)) == Equal { + return Some(Err(())); + } + return Some(Ok(mid)); + } + }, + Greater => lower_bound = mid + 1, + } + } + + if let Some(candidates) = candidates { + *candidates = 0..0; + } + None +} + +pub(crate) fn lookup<'a>( + id: impl AsRef<gix_hash::oid>, + fan: &[u32; FAN_LEN], + oid_at_index: impl Fn(EntryIndex) -> &'a gix_hash::oid, +) -> Option<EntryIndex> { + let id = id.as_ref(); + let first_byte = id.first_byte() as usize; + let mut upper_bound = fan[first_byte]; + let mut lower_bound = if first_byte != 0 { fan[first_byte - 1] } else { 0 }; + + while lower_bound < upper_bound { + let mid = (lower_bound + upper_bound) / 2; + let mid_sha = oid_at_index(mid); + + use std::cmp::Ordering::*; + match id.cmp(mid_sha) { + Less => upper_bound = mid, + Equal => return Some(mid), + Greater => lower_bound = mid + 1, + } + } + None +} diff --git a/vendor/gix-pack/src/index/init.rs b/vendor/gix-pack/src/index/init.rs new file mode 100644 index 000000000..13eecdbda --- /dev/null +++ b/vendor/gix-pack/src/index/init.rs @@ -0,0 +1,91 @@ +use std::{mem::size_of, path::Path}; + +use crate::index::{self, Version, FAN_LEN, V2_SIGNATURE}; + +/// Returned by [`index::File::at()`]. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum Error { + #[error("Could not open pack index file at '{path}'")] + Io { + source: std::io::Error, + path: std::path::PathBuf, + }, + #[error("{message}")] + Corrupt { message: String }, + #[error("Unsupported index version: {version})")] + UnsupportedVersion { version: u32 }, +} + +const N32_SIZE: usize = size_of::<u32>(); + +/// Instantiation +impl index::File { + /// Open the pack index file at the given `path`. + /// + /// The `object_hash` is a way to read (and write) the same file format with different hashes, as the hash kind + /// isn't stored within the file format itself. + pub fn at(path: impl AsRef<Path>, object_hash: gix_hash::Kind) -> Result<index::File, Error> { + Self::at_inner(path.as_ref(), object_hash) + } + + fn at_inner(path: &Path, object_hash: gix_hash::Kind) -> Result<index::File, Error> { + let data = crate::mmap::read_only(path).map_err(|source| Error::Io { + source, + path: path.to_owned(), + })?; + let idx_len = data.len(); + let hash_len = object_hash.len_in_bytes(); + + let footer_size = hash_len * 2; + if idx_len < FAN_LEN * N32_SIZE + footer_size { + return Err(Error::Corrupt { + message: format!("Pack index of size {idx_len} is too small for even an empty index"), + }); + } + let (kind, fan, num_objects) = { + let (kind, d) = { + let (sig, d) = data.split_at(V2_SIGNATURE.len()); + if sig == V2_SIGNATURE { + (Version::V2, d) + } else { + (Version::V1, &data[..]) + } + }; + let d = { + if let Version::V2 = kind { + let (vd, dr) = d.split_at(N32_SIZE); + let version = crate::read_u32(vd); + if version != Version::V2 as u32 { + return Err(Error::UnsupportedVersion { version }); + } + dr + } else { + d + } + }; + let (fan, bytes_read) = read_fan(d); + let (_, _d) = d.split_at(bytes_read); + let num_objects = fan[FAN_LEN - 1]; + + (kind, fan, num_objects) + }; + Ok(index::File { + data, + path: path.to_owned(), + version: kind, + num_objects, + fan, + hash_len, + object_hash, + }) + } +} + +fn read_fan(d: &[u8]) -> ([u32; FAN_LEN], usize) { + let mut fan = [0; FAN_LEN]; + for (c, f) in d.chunks(N32_SIZE).zip(fan.iter_mut()) { + *f = crate::read_u32(c); + } + (fan, FAN_LEN * N32_SIZE) +} diff --git a/vendor/gix-pack/src/index/mod.rs b/vendor/gix-pack/src/index/mod.rs new file mode 100644 index 000000000..341322f7d --- /dev/null +++ b/vendor/gix-pack/src/index/mod.rs @@ -0,0 +1,155 @@ +//! an index into the pack file +//! +/// From itertools +/// Create an iterator running multiple iterators in lockstep. +/// +/// The `izip!` iterator yields elements until any subiterator +/// returns `None`. +/// +/// This is a version of the standard ``.zip()`` that's supporting more than +/// two iterators. The iterator element type is a tuple with one element +/// from each of the input iterators. Just like ``.zip()``, the iteration stops +/// when the shortest of the inputs reaches its end. +/// +/// **Note:** The result of this macro is in the general case an iterator +/// composed of repeated `.zip()` and a `.map()`; it has an anonymous type. +/// The special cases of one and two arguments produce the equivalent of +/// `$a.into_iter()` and `$a.into_iter().zip($b)` respectively. +/// +/// Prefer this macro `izip!()` over [`multizip`] for the performance benefits +/// of using the standard library `.zip()`. +/// +/// [`multizip`]: fn.multizip.html +/// +/// ``` +/// # use itertools::izip; +/// # +/// # fn main() { +/// +/// // iterate over three sequences side-by-side +/// let mut results = [0, 0, 0, 0]; +/// let inputs = [3, 7, 9, 6]; +/// +/// for (r, index, input) in izip!(&mut results, 0..10, &inputs) { +/// *r = index * 10 + input; +/// } +/// +/// assert_eq!(results, [0 + 3, 10 + 7, 29, 36]); +/// # } +/// ``` +macro_rules! izip { + // @closure creates a tuple-flattening closure for .map() call. usage: + // @closure partial_pattern => partial_tuple , rest , of , iterators + // eg. izip!( @closure ((a, b), c) => (a, b, c) , dd , ee ) + ( @closure $p:pat => $tup:expr ) => { + |$p| $tup + }; + + // The "b" identifier is a different identifier on each recursion level thanks to hygiene. + ( @closure $p:pat => ( $($tup:tt)* ) , $_iter:expr $( , $tail:expr )* ) => { + izip!(@closure ($p, b) => ( $($tup)*, b ) $( , $tail )*) + }; + + // unary + ($first:expr $(,)*) => { + std::iter::IntoIterator::into_iter($first) + }; + + // binary + ($first:expr, $second:expr $(,)*) => { + izip!($first) + .zip($second) + }; + + // n-ary where n > 2 + ( $first:expr $( , $rest:expr )* $(,)* ) => { + izip!($first) + $( + .zip($rest) + )* + .map( + izip!(@closure a => (a) $( , $rest )*) + ) + }; +} + +use memmap2::Mmap; + +/// The version of an index file +#[derive(PartialEq, Eq, Ord, PartialOrd, Debug, Hash, Clone, Copy)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[allow(missing_docs)] +pub enum Version { + V1 = 1, + V2 = 2, +} + +impl Default for Version { + fn default() -> Self { + Version::V2 + } +} + +impl Version { + /// The kind of hash to produce to be compatible to this kind of index + pub fn hash(&self) -> gix_hash::Kind { + match self { + Version::V1 | Version::V2 => gix_hash::Kind::Sha1, + } + } +} + +/// A way to indicate if a lookup, despite successful, was ambiguous or yielded exactly +/// one result in the particular index. +pub type PrefixLookupResult = Result<EntryIndex, ()>; + +/// The type for referring to indices of an entry within the index file. +pub type EntryIndex = u32; + +const FAN_LEN: usize = 256; + +/// A representation of a pack index file +pub struct File { + data: Mmap, + path: std::path::PathBuf, + version: Version, + num_objects: u32, + fan: [u32; FAN_LEN], + hash_len: usize, + object_hash: gix_hash::Kind, +} + +/// Basic file information +impl File { + /// The version of the pack index + pub fn version(&self) -> Version { + self.version + } + /// The path of the opened index file + pub fn path(&self) -> &std::path::Path { + &self.path + } + /// The amount of objects stored in the pack and index, as one past the highest entry index. + pub fn num_objects(&self) -> EntryIndex { + self.num_objects + } + /// The kind of hash we assume + pub fn object_hash(&self) -> gix_hash::Kind { + self.object_hash + } +} + +const V2_SIGNATURE: &[u8] = b"\xfftOc"; +/// +pub mod init; + +pub(crate) mod access; +pub use access::Entry; + +/// +pub mod traverse; +mod util; +/// +pub mod verify; +/// +pub mod write; diff --git a/vendor/gix-pack/src/index/traverse/error.rs b/vendor/gix-pack/src/index/traverse/error.rs new file mode 100644 index 000000000..2310c3bab --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/error.rs @@ -0,0 +1,44 @@ +use crate::index; + +/// Returned by [`index::File::traverse_with_index()`] and [`index::File::traverse_with_lookup`] +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum Error<E: std::error::Error + Send + Sync + 'static> { + #[error("One of the traversal processors failed")] + Processor(#[source] E), + #[error("Index file, pack file or object verification failed")] + VerifyChecksum(#[from] index::verify::checksum::Error), + #[error("The pack delta tree index could not be built")] + Tree(#[from] crate::cache::delta::from_offsets::Error), + #[error("The tree traversal failed")] + TreeTraversal(#[from] crate::cache::delta::traverse::Error), + #[error("Object {id} at offset {offset} could not be decoded")] + PackDecode { + id: gix_hash::ObjectId, + offset: u64, + source: crate::data::decode::Error, + }, + #[error("The packfiles checksum didn't match the index file checksum: expected {expected}, got {actual}")] + PackMismatch { + expected: gix_hash::ObjectId, + actual: gix_hash::ObjectId, + }, + #[error("The hash of {kind} object at offset {offset} didn't match the checksum in the index file: expected {expected}, got {actual}")] + PackObjectMismatch { + expected: gix_hash::ObjectId, + actual: gix_hash::ObjectId, + offset: u64, + kind: gix_object::Kind, + }, + #[error( + "The CRC32 of {kind} object at offset {offset} didn't match the checksum in the index file: expected {expected}, got {actual}" + )] + Crc32Mismatch { + expected: u32, + actual: u32, + offset: u64, + kind: gix_object::Kind, + }, + #[error("Interrupted")] + Interrupted, +} diff --git a/vendor/gix-pack/src/index/traverse/mod.rs b/vendor/gix-pack/src/index/traverse/mod.rs new file mode 100644 index 000000000..42c820b0e --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/mod.rs @@ -0,0 +1,245 @@ +use std::sync::atomic::AtomicBool; + +use gix_features::{parallel, progress::Progress}; + +use crate::index; + +mod reduce; +/// +pub mod with_index; +/// +pub mod with_lookup; +use reduce::Reducer; + +mod error; +pub use error::Error; + +mod types; +pub use types::{Algorithm, ProgressId, SafetyCheck, Statistics}; + +/// Traversal options for [`index::File::traverse()`]. +#[derive(Debug, Clone)] +pub struct Options<F> { + /// The algorithm to employ. + pub traversal: Algorithm, + /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on + /// the amount of available logical cores. + pub thread_limit: Option<usize>, + /// The kinds of safety checks to perform. + pub check: SafetyCheck, + /// A function to create a pack cache + pub make_pack_lookup_cache: F, +} + +impl Default for Options<fn() -> crate::cache::Never> { + fn default() -> Self { + Options { + check: Default::default(), + traversal: Default::default(), + thread_limit: None, + make_pack_lookup_cache: || crate::cache::Never, + } + } +} + +/// The outcome of the [`traverse()`][index::File::traverse()] method. +pub struct Outcome<P> { + /// The checksum obtained when hashing the file, which matched the checksum contained within the file. + pub actual_index_checksum: gix_hash::ObjectId, + /// The statistics obtained during traversal. + pub statistics: Statistics, + /// The input progress to allow reuse. + pub progress: P, +} + +/// Traversal of pack data files using an index file +impl index::File { + /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor`. + /// The return value is (pack-checksum, [`Outcome`], `progress`), thus the pack traversal will always verify + /// the whole packs checksum to assure it was correct. In case of bit-rod, the operation will abort early without + /// verifying all objects using the [interrupt mechanism][gix_features::interrupt] mechanism. + /// + /// # Algorithms + /// + /// Using the [`Options::traversal`] field one can chose between two algorithms providing different tradeoffs. Both invoke + /// `new_processor()` to create functions receiving decoded objects, their object kind, index entry and a progress instance to provide + /// progress information. + /// + /// * [`Algorithm::DeltaTreeLookup`] builds an index to avoid any unnecessary computation while resolving objects, avoiding + /// the need for a cache entirely, rendering `new_cache()` unused. + /// One could also call [`traverse_with_index()`][index::File::traverse_with_index()] directly. + /// * [`Algorithm::Lookup`] uses a cache created by `new_cache()` to avoid having to re-compute all bases of a delta-chain while + /// decoding objects. + /// One could also call [`traverse_with_lookup()`][index::File::traverse_with_lookup()] directly. + /// + /// Use [`thread_limit`][Options::thread_limit] to further control parallelism and [`check`][SafetyCheck] to define how much the passed + /// objects shall be verified beforehand. + pub fn traverse<P, C, Processor, E, F>( + &self, + pack: &crate::data::File, + progress: P, + should_interrupt: &AtomicBool, + new_processor: impl Fn() -> Processor + Send + Clone, + Options { + traversal, + thread_limit, + check, + make_pack_lookup_cache, + }: Options<F>, + ) -> Result<Outcome<P>, Error<E>> + where + P: Progress, + C: crate::cache::DecodeEntry, + E: std::error::Error + Send + Sync + 'static, + Processor: FnMut( + gix_object::Kind, + &[u8], + &index::Entry, + &mut <P::SubProgress as Progress>::SubProgress, + ) -> Result<(), E>, + F: Fn() -> C + Send + Clone, + { + match traversal { + Algorithm::Lookup => self.traverse_with_lookup( + new_processor, + pack, + progress, + should_interrupt, + with_lookup::Options { + thread_limit, + check, + make_pack_lookup_cache, + }, + ), + Algorithm::DeltaTreeLookup => self.traverse_with_index( + pack, + new_processor, + progress, + should_interrupt, + crate::index::traverse::with_index::Options { check, thread_limit }, + ), + } + } + + fn possibly_verify<E>( + &self, + pack: &crate::data::File, + check: SafetyCheck, + pack_progress: impl Progress, + index_progress: impl Progress, + should_interrupt: &AtomicBool, + ) -> Result<gix_hash::ObjectId, Error<E>> + where + E: std::error::Error + Send + Sync + 'static, + { + Ok(if check.file_checksum() { + if self.pack_checksum() != pack.checksum() { + return Err(Error::PackMismatch { + actual: pack.checksum(), + expected: self.pack_checksum(), + }); + } + let (pack_res, id) = parallel::join( + move || pack.verify_checksum(pack_progress, should_interrupt), + move || self.verify_checksum(index_progress, should_interrupt), + ); + pack_res?; + id? + } else { + self.index_checksum() + }) + } + + #[allow(clippy::too_many_arguments)] + fn decode_and_process_entry<C, P, E>( + &self, + check: SafetyCheck, + pack: &crate::data::File, + cache: &mut C, + buf: &mut Vec<u8>, + progress: &mut P, + index_entry: &crate::index::Entry, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, + ) -> Result<crate::data::decode::entry::Outcome, Error<E>> + where + C: crate::cache::DecodeEntry, + P: Progress, + E: std::error::Error + Send + Sync + 'static, + { + let pack_entry = pack.entry(index_entry.pack_offset); + let pack_entry_data_offset = pack_entry.data_offset; + let entry_stats = pack + .decode_entry( + pack_entry, + buf, + |id, _| { + self.lookup(id).map(|index| { + crate::data::decode::entry::ResolvedBase::InPack(pack.entry(self.pack_offset_at_index(index))) + }) + }, + cache, + ) + .map_err(|e| Error::PackDecode { + source: e, + id: index_entry.oid, + offset: index_entry.pack_offset, + })?; + let object_kind = entry_stats.kind; + let header_size = (pack_entry_data_offset - index_entry.pack_offset) as usize; + let entry_len = header_size + entry_stats.compressed_size; + + process_entry( + check, + object_kind, + buf, + progress, + index_entry, + || pack.entry_crc32(index_entry.pack_offset, entry_len), + processor, + )?; + Ok(entry_stats) + } +} + +#[allow(clippy::too_many_arguments)] +fn process_entry<P, E>( + check: SafetyCheck, + object_kind: gix_object::Kind, + decompressed: &[u8], + progress: &mut P, + index_entry: &crate::index::Entry, + pack_entry_crc32: impl FnOnce() -> u32, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, +) -> Result<(), Error<E>> +where + P: Progress, + E: std::error::Error + Send + Sync + 'static, +{ + if check.object_checksum() { + let mut hasher = gix_features::hash::hasher(index_entry.oid.kind()); + hasher.update(&gix_object::encode::loose_header(object_kind, decompressed.len())); + hasher.update(decompressed); + + let actual_oid = gix_hash::ObjectId::from(hasher.digest()); + if actual_oid != index_entry.oid { + return Err(Error::PackObjectMismatch { + actual: actual_oid, + expected: index_entry.oid, + offset: index_entry.pack_offset, + kind: object_kind, + }); + } + if let Some(desired_crc32) = index_entry.crc32 { + let actual_crc32 = pack_entry_crc32(); + if actual_crc32 != desired_crc32 { + return Err(Error::Crc32Mismatch { + actual: actual_crc32, + expected: desired_crc32, + offset: index_entry.pack_offset, + kind: object_kind, + }); + } + } + } + processor(object_kind, decompressed, index_entry, progress).map_err(Error::Processor) +} diff --git a/vendor/gix-pack/src/index/traverse/reduce.rs b/vendor/gix-pack/src/index/traverse/reduce.rs new file mode 100644 index 000000000..e05341242 --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/reduce.rs @@ -0,0 +1,129 @@ +use std::{ + sync::atomic::{AtomicBool, Ordering}, + time::Instant, +}; + +use gix_features::{ + parallel, + progress::Progress, + threading::{lock, Mutable, OwnShared}, +}; + +use crate::{data, index::traverse}; + +fn add_decode_result(lhs: &mut data::decode::entry::Outcome, rhs: data::decode::entry::Outcome) { + lhs.num_deltas += rhs.num_deltas; + lhs.decompressed_size += rhs.decompressed_size; + lhs.compressed_size += rhs.compressed_size; + lhs.object_size += rhs.object_size; +} + +fn div_decode_result(lhs: &mut data::decode::entry::Outcome, div: usize) { + if div != 0 { + lhs.num_deltas = (lhs.num_deltas as f32 / div as f32) as u32; + lhs.decompressed_size /= div as u64; + lhs.compressed_size /= div; + lhs.object_size /= div as u64; + } +} + +pub struct Reducer<'a, P, E> { + progress: OwnShared<Mutable<P>>, + check: traverse::SafetyCheck, + then: Instant, + entries_seen: usize, + stats: traverse::Statistics, + should_interrupt: &'a AtomicBool, + _error: std::marker::PhantomData<E>, +} + +impl<'a, P, E> Reducer<'a, P, E> +where + P: Progress, +{ + pub fn from_progress( + progress: OwnShared<Mutable<P>>, + pack_data_len_in_bytes: usize, + check: traverse::SafetyCheck, + should_interrupt: &'a AtomicBool, + ) -> Self { + let stats = traverse::Statistics { + pack_size: pack_data_len_in_bytes as u64, + ..Default::default() + }; + Reducer { + progress, + check, + then: Instant::now(), + entries_seen: 0, + should_interrupt, + stats, + _error: Default::default(), + } + } +} + +impl<'a, P, E> parallel::Reduce for Reducer<'a, P, E> +where + P: Progress, + E: std::error::Error + Send + Sync + 'static, +{ + type Input = Result<Vec<data::decode::entry::Outcome>, traverse::Error<E>>; + type FeedProduce = (); + type Output = traverse::Statistics; + type Error = traverse::Error<E>; + + fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> { + let chunk_stats: Vec<_> = match input { + Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => { + lock(&self.progress).info(format!("Ignoring decode error: {err}")); + return Ok(()); + } + res => res, + }?; + self.entries_seen += chunk_stats.len(); + + let chunk_total = chunk_stats.into_iter().fold( + data::decode::entry::Outcome::default_from_kind(gix_object::Kind::Tree), + |mut total, stats| { + *self.stats.objects_per_chain_length.entry(stats.num_deltas).or_insert(0) += 1; + self.stats.total_decompressed_entries_size += stats.decompressed_size; + self.stats.total_compressed_entries_size += stats.compressed_size as u64; + self.stats.total_object_size += stats.object_size; + use gix_object::Kind::*; + match stats.kind { + Commit => self.stats.num_commits += 1, + Tree => self.stats.num_trees += 1, + Blob => self.stats.num_blobs += 1, + Tag => self.stats.num_tags += 1, + } + add_decode_result(&mut total, stats); + total + }, + ); + + add_decode_result(&mut self.stats.average, chunk_total); + lock(&self.progress).set(self.entries_seen); + + if self.should_interrupt.load(Ordering::SeqCst) { + return Err(Self::Error::Interrupted); + } + Ok(()) + } + + fn finalize(mut self) -> Result<Self::Output, Self::Error> { + div_decode_result(&mut self.stats.average, self.entries_seen); + + let elapsed_s = self.then.elapsed().as_secs_f32(); + let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32; + + lock(&self.progress).info(format!( + "of {} objects done in {:.2}s ({} objects/s, ~{}/s)", + self.entries_seen, + elapsed_s, + objects_per_second, + gix_features::progress::bytesize::ByteSize(self.stats.average.object_size * objects_per_second as u64) + )); + Ok(self.stats) + } +} diff --git a/vendor/gix-pack/src/index/traverse/types.rs b/vendor/gix-pack/src/index/traverse/types.rs new file mode 100644 index 000000000..84ebc8932 --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/types.rs @@ -0,0 +1,123 @@ +use std::{collections::BTreeMap, marker::PhantomData}; + +/// Statistics regarding object encountered during execution of the [`traverse()`][crate::index::File::traverse()] method. +#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd, Clone)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub struct Statistics { + /// The average over all decoded objects + pub average: crate::data::decode::entry::Outcome, + /// A mapping of the length of the chain to the amount of objects at that length. + /// + /// A length of 0 indicates full objects, and everything above that involves the given amount + /// of delta objects. + pub objects_per_chain_length: BTreeMap<u32, u32>, + /// The amount of bytes in all compressed streams, one per entry + pub total_compressed_entries_size: u64, + /// The amount of bytes in all decompressed streams, one per entry + pub total_decompressed_entries_size: u64, + /// The amount of bytes occupied by all undeltified, decompressed objects + pub total_object_size: u64, + /// The amount of bytes occupied by the pack itself, in bytes + pub pack_size: u64, + /// The amount of objects encountered that where commits + pub num_commits: u32, + /// The amount of objects encountered that where trees + pub num_trees: u32, + /// The amount of objects encountered that where tags + pub num_tags: u32, + /// The amount of objects encountered that where blobs + pub num_blobs: u32, +} + +impl Default for Statistics { + fn default() -> Self { + Statistics { + average: crate::data::decode::entry::Outcome::default_from_kind(gix_object::Kind::Tree), + objects_per_chain_length: Default::default(), + total_compressed_entries_size: 0, + total_decompressed_entries_size: 0, + total_object_size: 0, + pack_size: 0, + num_blobs: 0, + num_commits: 0, + num_trees: 0, + num_tags: 0, + } + } +} + +/// The ways to validate decoded objects before passing them to the processor. +#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub enum SafetyCheck { + /// Don't verify the validity of the checksums stored in the index and pack file + SkipFileChecksumVerification, + + /// All of the above, and also don't perform any object checksum verification + SkipFileAndObjectChecksumVerification, + + /// All of the above, and only log object decode errors. + /// + /// Useful if there is a damaged pack and you would like to traverse as many objects as possible. + SkipFileAndObjectChecksumVerificationAndNoAbortOnDecodeError, + + /// Perform all available safety checks before operating on the pack and + /// abort if any of them fails + All, +} + +impl SafetyCheck { + pub(crate) fn file_checksum(&self) -> bool { + matches!(self, SafetyCheck::All) + } + pub(crate) fn object_checksum(&self) -> bool { + matches!(self, SafetyCheck::All | SafetyCheck::SkipFileChecksumVerification) + } + pub(crate) fn fatal_decode_error(&self) -> bool { + match self { + SafetyCheck::All + | SafetyCheck::SkipFileChecksumVerification + | SafetyCheck::SkipFileAndObjectChecksumVerification => true, + SafetyCheck::SkipFileAndObjectChecksumVerificationAndNoAbortOnDecodeError => false, + } + } +} + +impl Default for SafetyCheck { + fn default() -> Self { + SafetyCheck::All + } +} + +/// The way we verify the pack +#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub enum Algorithm { + /// Build an index to allow decoding each delta and base exactly once, saving a lot of computational + /// resource at the expense of resident memory, as we will use an additional `DeltaTree` to accelerate + /// delta chain resolution. + DeltaTreeLookup, + /// We lookup each object similarly to what would happen during normal repository use. + /// Uses more compute resources as it will resolve delta chains from back to front, but start right away + /// without indexing or investing any memory in indices. + /// + /// This option may be well suited for big packs in memory-starved system that support memory mapping. + Lookup, +} + +impl Default for Algorithm { + fn default() -> Self { + Algorithm::DeltaTreeLookup + } +} + +/// The progress ids used in [`traverse()`][crate::index::File::traverse()] . +/// +/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. +#[derive(Debug, Copy, Clone)] +pub enum ProgressId { + /// A root progress which isn't actually used, but links to the `ProgressId` of the lookup version of the algorithm. + WithLookup(PhantomData<super::with_lookup::ProgressId>), + /// A root progress which isn't actually used, but links to the `ProgressId` of the indexed version of the algorithm. + WithIndex(PhantomData<super::with_index::ProgressId>), +} diff --git a/vendor/gix-pack/src/index/traverse/with_index.rs b/vendor/gix-pack/src/index/traverse/with_index.rs new file mode 100644 index 000000000..769bbd07f --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/with_index.rs @@ -0,0 +1,230 @@ +use std::sync::atomic::{AtomicBool, Ordering}; + +use gix_features::{parallel, progress::Progress}; + +use super::Error; +use crate::{ + cache::delta::traverse, + index::{self, traverse::Outcome, util::index_entries_sorted_by_offset_ascending}, +}; + +/// Traversal options for [`traverse_with_index()`][index::File::traverse_with_index()] +#[derive(Default)] +pub struct Options { + /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on + /// the amount of available logical cores. + pub thread_limit: Option<usize>, + /// The kinds of safety checks to perform. + pub check: crate::index::traverse::SafetyCheck, +} + +/// The progress ids used in [`index::File::traverse_with_index()`]. +/// +/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. +#[derive(Debug, Copy, Clone)] +pub enum ProgressId { + /// The amount of bytes currently processed to generate a checksum of the *pack data file*. + HashPackDataBytes, + /// The amount of bytes currently processed to generate a checksum of the *pack index file*. + HashPackIndexBytes, + /// Collect all object hashes into a vector and sort it by their pack offset. + CollectSortedIndexEntries, + /// Count the objects processed when building a cache tree from all objects in a pack index. + TreeFromOffsetsObjects, + /// The amount of objects which were decoded. + DecodedObjects, + /// The amount of bytes that were decoded in total, as the sum of all bytes to represent all decoded objects. + DecodedBytes, +} + +impl From<ProgressId> for gix_features::progress::Id { + fn from(v: ProgressId) -> Self { + match v { + ProgressId::HashPackDataBytes => *b"PTHP", + ProgressId::HashPackIndexBytes => *b"PTHI", + ProgressId::CollectSortedIndexEntries => *b"PTCE", + ProgressId::TreeFromOffsetsObjects => *b"PTDI", + ProgressId::DecodedObjects => *b"PTRO", + ProgressId::DecodedBytes => *b"PTDB", + } + } +} + +/// Traversal with index +impl index::File { + /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor`, using an index to reduce waste + /// at the cost of memory. + /// + /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method. + pub fn traverse_with_index<P, Processor, E>( + &self, + pack: &crate::data::File, + new_processor: impl Fn() -> Processor + Send + Clone, + mut progress: P, + should_interrupt: &AtomicBool, + Options { check, thread_limit }: Options, + ) -> Result<Outcome<P>, Error<E>> + where + P: Progress, + Processor: FnMut( + gix_object::Kind, + &[u8], + &index::Entry, + &mut <P::SubProgress as Progress>::SubProgress, + ) -> Result<(), E>, + E: std::error::Error + Send + Sync + 'static, + { + let (verify_result, traversal_result) = parallel::join( + { + let pack_progress = progress.add_child_with_id( + format!( + "Hash of pack '{}'", + pack.path().file_name().expect("pack has filename").to_string_lossy() + ), + ProgressId::HashPackDataBytes.into(), + ); + let index_progress = progress.add_child_with_id( + format!( + "Hash of index '{}'", + self.path.file_name().expect("index has filename").to_string_lossy() + ), + ProgressId::HashPackIndexBytes.into(), + ); + move || { + let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt); + if res.is_err() { + should_interrupt.store(true, Ordering::SeqCst); + } + res + } + }, + || -> Result<_, Error<_>> { + let sorted_entries = index_entries_sorted_by_offset_ascending( + self, + progress.add_child_with_id("collecting sorted index", ProgressId::CollectSortedIndexEntries.into()), + ); /* Pack Traverse Collect sorted Entries */ + let tree = crate::cache::delta::Tree::from_offsets_in_pack( + pack.path(), + sorted_entries.into_iter().map(Entry::from), + |e| e.index_entry.pack_offset, + |id| self.lookup(id).map(|idx| self.pack_offset_at_index(idx)), + progress.add_child_with_id("indexing", ProgressId::TreeFromOffsetsObjects.into()), + should_interrupt, + self.object_hash, + )?; + let mut outcome = digest_statistics(tree.traverse( + |slice, out| pack.entry_slice(slice).map(|entry| out.copy_from_slice(entry)), + pack.pack_end() as u64, + new_processor, + |data, + progress, + traverse::Context { + entry: pack_entry, + entry_end, + decompressed: bytes, + state: ref mut processor, + level, + }| { + let object_kind = pack_entry.header.as_kind().expect("non-delta object"); + data.level = level; + data.decompressed_size = pack_entry.decompressed_size; + data.object_kind = object_kind; + data.compressed_size = entry_end - pack_entry.data_offset; + data.object_size = bytes.len() as u64; + let result = crate::index::traverse::process_entry( + check, + object_kind, + bytes, + progress, + &data.index_entry, + || { + // TODO: Fix this - we overwrite the header of 'data' which also changes the computed entry size, + // causing index and pack to seemingly mismatch. This is surprising, and should be done differently. + // debug_assert_eq!(&data.index_entry.pack_offset, &pack_entry.pack_offset()); + gix_features::hash::crc32( + pack.entry_slice(data.index_entry.pack_offset..entry_end) + .expect("slice pointing into the pack (by now data is verified)"), + ) + }, + processor, + ); + match result { + Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => { + progress.info(format!("Ignoring decode error: {err}")); + Ok(()) + } + res => res, + } + }, + crate::cache::delta::traverse::Options { + object_progress: progress.add_child_with_id("Resolving", ProgressId::DecodedObjects.into()), + size_progress: progress.add_child_with_id("Decoding", ProgressId::DecodedBytes.into()), + thread_limit, + should_interrupt, + object_hash: self.object_hash, + }, + )?); + outcome.pack_size = pack.data_len() as u64; + Ok(outcome) + }, + ); + Ok(Outcome { + actual_index_checksum: verify_result?, + statistics: traversal_result?, + progress, + }) + } +} + +struct Entry { + index_entry: crate::index::Entry, + object_kind: gix_object::Kind, + object_size: u64, + decompressed_size: u64, + compressed_size: u64, + level: u16, +} + +impl From<crate::index::Entry> for Entry { + fn from(index_entry: crate::index::Entry) -> Self { + Entry { + index_entry, + level: 0, + object_kind: gix_object::Kind::Tree, + object_size: 0, + decompressed_size: 0, + compressed_size: 0, + } + } +} + +fn digest_statistics(traverse::Outcome { roots, children }: traverse::Outcome<Entry>) -> index::traverse::Statistics { + let mut res = index::traverse::Statistics::default(); + let average = &mut res.average; + for item in roots.iter().chain(children.iter()) { + res.total_compressed_entries_size += item.data.compressed_size; + res.total_decompressed_entries_size += item.data.decompressed_size; + res.total_object_size += item.data.object_size; + *res.objects_per_chain_length.entry(item.data.level as u32).or_insert(0) += 1; + + average.decompressed_size += item.data.decompressed_size; + average.compressed_size += item.data.compressed_size as usize; + average.object_size += item.data.object_size; + average.num_deltas += item.data.level as u32; + use gix_object::Kind::*; + match item.data.object_kind { + Blob => res.num_blobs += 1, + Tree => res.num_trees += 1, + Tag => res.num_tags += 1, + Commit => res.num_commits += 1, + }; + } + + let num_nodes = roots.len() + children.len(); + average.decompressed_size /= num_nodes as u64; + average.compressed_size /= num_nodes; + average.object_size /= num_nodes as u64; + average.num_deltas /= num_nodes as u32; + + res +} diff --git a/vendor/gix-pack/src/index/traverse/with_lookup.rs b/vendor/gix-pack/src/index/traverse/with_lookup.rs new file mode 100644 index 000000000..509ae4e4f --- /dev/null +++ b/vendor/gix-pack/src/index/traverse/with_lookup.rs @@ -0,0 +1,190 @@ +use std::sync::atomic::{AtomicBool, Ordering}; + +use gix_features::{ + parallel::{self, in_parallel_if}, + progress::{self, Progress}, + threading::{lock, Mutable, OwnShared}, +}; + +use super::{Error, Reducer}; +use crate::{ + data, index, + index::{traverse::Outcome, util}, +}; + +/// Traversal options for [`index::File::traverse_with_lookup()`] +pub struct Options<F> { + /// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on + /// the amount of available logical cores. + pub thread_limit: Option<usize>, + /// The kinds of safety checks to perform. + pub check: index::traverse::SafetyCheck, + /// A function to create a pack cache + pub make_pack_lookup_cache: F, +} + +impl Default for Options<fn() -> crate::cache::Never> { + fn default() -> Self { + Options { + check: Default::default(), + thread_limit: None, + make_pack_lookup_cache: || crate::cache::Never, + } + } +} + +/// The progress ids used in [`index::File::traverse_with_lookup()`]. +/// +/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. +#[derive(Debug, Copy, Clone)] +pub enum ProgressId { + /// The amount of bytes currently processed to generate a checksum of the *pack data file*. + HashPackDataBytes, + /// The amount of bytes currently processed to generate a checksum of the *pack index file*. + HashPackIndexBytes, + /// Collect all object hashes into a vector and sort it by their pack offset. + CollectSortedIndexEntries, + /// The amount of objects which were decoded by brute-force. + DecodedObjects, +} + +impl From<ProgressId> for gix_features::progress::Id { + fn from(v: ProgressId) -> Self { + match v { + ProgressId::HashPackDataBytes => *b"PTHP", + ProgressId::HashPackIndexBytes => *b"PTHI", + ProgressId::CollectSortedIndexEntries => *b"PTCE", + ProgressId::DecodedObjects => *b"PTRO", + } + } +} + +/// Verify and validate the content of the index file +impl index::File { + /// Iterate through all _decoded objects_ in the given `pack` and handle them with a `Processor` using a cache to reduce the amount of + /// waste while decoding objects. + /// + /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method. + pub fn traverse_with_lookup<P, C, Processor, E, F>( + &self, + new_processor: impl Fn() -> Processor + Send + Clone, + pack: &crate::data::File, + mut progress: P, + should_interrupt: &AtomicBool, + Options { + thread_limit, + check, + make_pack_lookup_cache, + }: Options<F>, + ) -> Result<Outcome<P>, Error<E>> + where + P: Progress, + C: crate::cache::DecodeEntry, + E: std::error::Error + Send + Sync + 'static, + Processor: FnMut( + gix_object::Kind, + &[u8], + &index::Entry, + &mut <P::SubProgress as Progress>::SubProgress, + ) -> Result<(), E>, + F: Fn() -> C + Send + Clone, + { + let (verify_result, traversal_result) = parallel::join( + { + let pack_progress = progress.add_child_with_id( + format!( + "Hash of pack '{}'", + pack.path().file_name().expect("pack has filename").to_string_lossy() + ), + ProgressId::HashPackDataBytes.into(), + ); + let index_progress = progress.add_child_with_id( + format!( + "Hash of index '{}'", + self.path.file_name().expect("index has filename").to_string_lossy() + ), + ProgressId::HashPackIndexBytes.into(), + ); + move || { + let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt); + if res.is_err() { + should_interrupt.store(true, Ordering::SeqCst); + } + res + } + }, + || { + let index_entries = util::index_entries_sorted_by_offset_ascending( + self, + progress.add_child_with_id("collecting sorted index", ProgressId::CollectSortedIndexEntries.into()), + ); + + let (chunk_size, thread_limit, available_cores) = + parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None); + let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores; + let input_chunks = index_entries.chunks(chunk_size.max(chunk_size)); + let reduce_progress = OwnShared::new(Mutable::new({ + let mut p = progress.add_child_with_id("Traversing", ProgressId::DecodedObjects.into()); + p.init(Some(self.num_objects() as usize), progress::count("objects")); + p + })); + let state_per_thread = { + let reduce_progress = reduce_progress.clone(); + move |index| { + ( + make_pack_lookup_cache(), + new_processor(), + Vec::with_capacity(2048), // decode buffer + lock(&reduce_progress) + .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress + ) + } + }; + + in_parallel_if( + there_are_enough_entries_to_process, + input_chunks, + thread_limit, + state_per_thread, + |entries: &[index::Entry], + (cache, ref mut processor, buf, progress)| + -> Result<Vec<data::decode::entry::Outcome>, Error<_>> { + progress.init( + Some(entries.len()), + gix_features::progress::count_with_decimals("objects", 2), + ); + let mut stats = Vec::with_capacity(entries.len()); + progress.set(0); + for index_entry in entries.iter() { + let result = self.decode_and_process_entry( + check, + pack, + cache, + buf, + progress, + index_entry, + processor, + ); + progress.inc(); + let stat = match result { + Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => { + progress.info(format!("Ignoring decode error: {err}")); + continue; + } + res => res, + }?; + stats.push(stat); + } + Ok(stats) + }, + Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt), + ) + }, + ); + Ok(Outcome { + actual_index_checksum: verify_result?, + statistics: traversal_result?, + progress, + }) + } +} diff --git a/vendor/gix-pack/src/index/util.rs b/vendor/gix-pack/src/index/util.rs new file mode 100644 index 000000000..284ee6158 --- /dev/null +++ b/vendor/gix-pack/src/index/util.rs @@ -0,0 +1,47 @@ +use std::{io, time::Instant}; + +use gix_features::progress::{self, Progress}; + +pub(crate) fn index_entries_sorted_by_offset_ascending( + idx: &crate::index::File, + mut progress: impl Progress, +) -> Vec<crate::index::Entry> { + progress.init(Some(idx.num_objects as usize), progress::count("entries")); + let start = Instant::now(); + + let mut v = Vec::with_capacity(idx.num_objects as usize); + for entry in idx.iter() { + v.push(entry); + progress.inc(); + } + v.sort_by_key(|e| e.pack_offset); + + progress.show_throughput(start); + v +} + +pub(crate) struct Count<W> { + pub bytes: u64, + pub inner: W, +} + +impl<W> Count<W> { + pub fn new(inner: W) -> Self { + Count { bytes: 0, inner } + } +} + +impl<W> io::Write for Count<W> +where + W: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let written = self.inner.write(buf)?; + self.bytes += written as u64; + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} diff --git a/vendor/gix-pack/src/index/verify.rs b/vendor/gix-pack/src/index/verify.rs new file mode 100644 index 000000000..4a4852fb6 --- /dev/null +++ b/vendor/gix-pack/src/index/verify.rs @@ -0,0 +1,290 @@ +use std::sync::atomic::AtomicBool; + +use gix_features::progress::Progress; +use gix_object::{bstr::ByteSlice, WriteTo}; + +use crate::index; + +/// +pub mod integrity { + use std::marker::PhantomData; + + use gix_object::bstr::BString; + + /// Returned by [`index::File::verify_integrity()`][crate::index::File::verify_integrity()]. + #[derive(thiserror::Error, Debug)] + #[allow(missing_docs)] + pub enum Error { + #[error("The fan at index {index} is out of order as it's larger then the following value.")] + Fan { index: usize }, + #[error("{kind} object {id} could not be decoded")] + ObjectDecode { + source: gix_object::decode::Error, + kind: gix_object::Kind, + id: gix_hash::ObjectId, + }, + #[error("{kind} object {id} wasn't re-encoded without change, wanted\n{expected}\n\nGOT\n\n{actual}")] + ObjectEncodeMismatch { + kind: gix_object::Kind, + id: gix_hash::ObjectId, + expected: BString, + actual: BString, + }, + } + + /// Returned by [`index::File::verify_integrity()`][crate::index::File::verify_integrity()]. + pub struct Outcome<P> { + /// The computed checksum of the index which matched the stored one. + pub actual_index_checksum: gix_hash::ObjectId, + /// The packs traversal outcome, if one was provided + pub pack_traverse_statistics: Option<crate::index::traverse::Statistics>, + /// The provided progress instance. + pub progress: P, + } + + /// Additional options to define how the integrity should be verified. + #[derive(Clone)] + pub struct Options<F> { + /// The thoroughness of the verification + pub verify_mode: crate::index::verify::Mode, + /// The way to traverse packs + pub traversal: crate::index::traverse::Algorithm, + /// The amount of threads to use of `Some(N)`, with `None|Some(0)` using all available cores are used. + pub thread_limit: Option<usize>, + /// A function to create a pack cache + pub make_pack_lookup_cache: F, + } + + impl Default for Options<fn() -> crate::cache::Never> { + fn default() -> Self { + Options { + verify_mode: Default::default(), + traversal: Default::default(), + thread_limit: None, + make_pack_lookup_cache: || crate::cache::Never, + } + } + } + + /// The progress ids used in [`index::File::verify_integrity()`][crate::index::File::verify_integrity()]. + /// + /// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. + #[derive(Debug, Copy, Clone)] + pub enum ProgressId { + /// The amount of bytes read to verify the index checksum. + ChecksumBytes, + /// A root progress for traversal which isn't actually used directly, but here to link to the respective `ProgressId` types. + Traverse(PhantomData<crate::index::verify::index::traverse::ProgressId>), + } + + impl From<ProgressId> for gix_features::progress::Id { + fn from(v: ProgressId) -> Self { + match v { + ProgressId::ChecksumBytes => *b"PTHI", + ProgressId::Traverse(_) => gix_features::progress::UNKNOWN, + } + } + } +} + +/// +pub mod checksum { + /// Returned by [`index::File::verify_checksum()`][crate::index::File::verify_checksum()]. + pub type Error = crate::verify::checksum::Error; +} + +/// Various ways in which a pack and index can be verified +#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +pub enum Mode { + /// Validate the object hash and CRC32 + HashCrc32, + /// Validate hash and CRC32, and decode each non-Blob object. + /// Each object should be valid, i.e. be decodable. + HashCrc32Decode, + /// Validate hash and CRC32, and decode and encode each non-Blob object. + /// Each object should yield exactly the same hash when re-encoded. + HashCrc32DecodeEncode, +} + +impl Default for Mode { + fn default() -> Self { + Mode::HashCrc32DecodeEncode + } +} + +/// Information to allow verifying the integrity of an index with the help of its corresponding pack. +pub struct PackContext<'a, F> { + /// The pack data file itself. + pub data: &'a crate::data::File, + /// The options further configuring the pack traversal and verification + pub options: integrity::Options<F>, +} + +/// Verify and validate the content of the index file +impl index::File { + /// Returns the trailing hash stored at the end of this index file. + /// + /// It's a hash over all bytes of the index. + pub fn index_checksum(&self) -> gix_hash::ObjectId { + gix_hash::ObjectId::from(&self.data[self.data.len() - self.hash_len..]) + } + + /// Returns the hash of the pack data file that this index file corresponds to. + /// + /// It should [`crate::data::File::checksum()`] of the corresponding pack data file. + pub fn pack_checksum(&self) -> gix_hash::ObjectId { + let from = self.data.len() - self.hash_len * 2; + gix_hash::ObjectId::from(&self.data[from..][..self.hash_len]) + } + + /// Validate that our [`index_checksum()`][index::File::index_checksum()] matches the actual contents + /// of this index file, and return it if it does. + pub fn verify_checksum( + &self, + progress: impl Progress, + should_interrupt: &AtomicBool, + ) -> Result<gix_hash::ObjectId, checksum::Error> { + crate::verify::checksum_on_disk_or_mmap( + self.path(), + &self.data, + self.index_checksum(), + self.object_hash, + progress, + should_interrupt, + ) + } + + /// The most thorough validation of integrity of both index file and the corresponding pack data file, if provided. + /// Returns the checksum of the index file, the traversal outcome and the given progress if the integrity check is successful. + /// + /// If `pack` is provided, it is expected (and validated to be) the pack belonging to this index. + /// It will be used to validate internal integrity of the pack before checking each objects integrity + /// is indeed as advertised via its SHA1 as stored in this index, as well as the CRC32 hash. + /// The last member of the Option is a function returning an implementation of [`crate::cache::DecodeEntry`] to be used if + /// the [`index::traverse::Algorithm`] is `Lookup`. + /// To set this to `None`, use `None::<(_, _, _, fn() -> crate::cache::Never)>`. + /// + /// The `thread_limit` optionally specifies the amount of threads to be used for the [pack traversal][index::File::traverse()]. + /// `make_cache` is only used in case a `pack` is specified, use existing implementations in the [`crate::cache`] module. + /// + /// # Tradeoffs + /// + /// The given `progress` is inevitably consumed if there is an error, which is a tradeoff chosen to easily allow using `?` in the + /// error case. + pub fn verify_integrity<P, C, F>( + &self, + pack: Option<PackContext<'_, F>>, + mut progress: P, + should_interrupt: &AtomicBool, + ) -> Result<integrity::Outcome<P>, index::traverse::Error<index::verify::integrity::Error>> + where + P: Progress, + C: crate::cache::DecodeEntry, + F: Fn() -> C + Send + Clone, + { + if let Some(first_invalid) = crate::verify::fan(&self.fan) { + return Err(index::traverse::Error::Processor(integrity::Error::Fan { + index: first_invalid, + })); + } + + match pack { + Some(PackContext { + data: pack, + options: + integrity::Options { + verify_mode, + traversal, + thread_limit, + make_pack_lookup_cache, + }, + }) => self + .traverse( + pack, + progress, + should_interrupt, + || { + let mut encode_buf = Vec::with_capacity(2048); + move |kind, data, index_entry, progress| { + Self::verify_entry(verify_mode, &mut encode_buf, kind, data, index_entry, progress) + } + }, + index::traverse::Options { + traversal, + thread_limit, + check: index::traverse::SafetyCheck::All, + make_pack_lookup_cache, + }, + ) + .map(|o| integrity::Outcome { + actual_index_checksum: o.actual_index_checksum, + pack_traverse_statistics: Some(o.statistics), + progress: o.progress, + }), + None => self + .verify_checksum( + progress.add_child_with_id("Sha1 of index", integrity::ProgressId::ChecksumBytes.into()), + should_interrupt, + ) + .map_err(Into::into) + .map(|id| integrity::Outcome { + actual_index_checksum: id, + pack_traverse_statistics: None, + progress, + }), + } + } + + #[allow(clippy::too_many_arguments)] + fn verify_entry<P>( + verify_mode: Mode, + encode_buf: &mut Vec<u8>, + object_kind: gix_object::Kind, + buf: &[u8], + index_entry: &index::Entry, + progress: &mut P, + ) -> Result<(), integrity::Error> + where + P: Progress, + { + if let Mode::HashCrc32Decode | Mode::HashCrc32DecodeEncode = verify_mode { + use gix_object::Kind::*; + match object_kind { + Tree | Commit | Tag => { + let object = gix_object::ObjectRef::from_bytes(object_kind, buf).map_err(|err| { + integrity::Error::ObjectDecode { + source: err, + kind: object_kind, + id: index_entry.oid, + } + })?; + if let Mode::HashCrc32DecodeEncode = verify_mode { + encode_buf.clear(); + object + .write_to(&mut *encode_buf) + .expect("writing to a memory buffer never fails"); + if encode_buf.as_slice() != buf { + let mut should_return_error = true; + if let gix_object::Kind::Tree = object_kind { + if buf.as_bstr().find(b"100664").is_some() || buf.as_bstr().find(b"100640").is_some() { + progress.info(format!("Tree object {} would be cleaned up during re-serialization, replacing mode '100664|100640' with '100644'", index_entry.oid)); + should_return_error = false + } + } + if should_return_error { + return Err(integrity::Error::ObjectEncodeMismatch { + kind: object_kind, + id: index_entry.oid, + expected: buf.into(), + actual: encode_buf.clone().into(), + }); + } + } + } + } + Blob => {} + }; + } + Ok(()) + } +} diff --git a/vendor/gix-pack/src/index/write/encode.rs b/vendor/gix-pack/src/index/write/encode.rs new file mode 100644 index 000000000..80f0cac61 --- /dev/null +++ b/vendor/gix-pack/src/index/write/encode.rs @@ -0,0 +1,127 @@ +use std::{cmp::Ordering, io}; + +pub(crate) const LARGE_OFFSET_THRESHOLD: u64 = 0x7fff_ffff; +pub(crate) const HIGH_BIT: u32 = 0x8000_0000; + +use gix_features::{ + hash, + progress::{self, Progress}, +}; + +use crate::index::{util::Count, V2_SIGNATURE}; + +pub(crate) fn write_to( + out: impl io::Write, + entries_sorted_by_oid: Vec<crate::cache::delta::Item<crate::index::write::TreeEntry>>, + pack_hash: &gix_hash::ObjectId, + kind: crate::index::Version, + mut progress: impl Progress, +) -> io::Result<gix_hash::ObjectId> { + use io::Write; + assert_eq!(kind, crate::index::Version::V2, "Can only write V2 packs right now"); + assert!( + entries_sorted_by_oid.len() <= u32::MAX as usize, + "a pack cannot have more than u32::MAX objects" + ); + + // Write header + let mut out = Count::new(std::io::BufWriter::with_capacity( + 8 * 4096, + hash::Write::new(out, kind.hash()), + )); + out.write_all(V2_SIGNATURE)?; + out.write_all(&(kind as u32).to_be_bytes())?; + + progress.init(Some(4), progress::steps()); + let start = std::time::Instant::now(); + let _info = progress.add_child_with_id("writing fan-out table", gix_features::progress::UNKNOWN); + let fan_out = fanout(entries_sorted_by_oid.iter().map(|e| e.data.id.first_byte())); + + for value in fan_out.iter() { + out.write_all(&value.to_be_bytes())?; + } + + progress.inc(); + let _info = progress.add_child_with_id("writing ids", gix_features::progress::UNKNOWN); + for entry in &entries_sorted_by_oid { + out.write_all(entry.data.id.as_slice())?; + } + + progress.inc(); + let _info = progress.add_child_with_id("writing crc32", gix_features::progress::UNKNOWN); + for entry in &entries_sorted_by_oid { + out.write_all(&entry.data.crc32.to_be_bytes())?; + } + + progress.inc(); + let _info = progress.add_child_with_id("writing offsets", gix_features::progress::UNKNOWN); + { + let mut offsets64 = Vec::<u64>::new(); + for entry in &entries_sorted_by_oid { + let offset: u32 = if entry.offset > LARGE_OFFSET_THRESHOLD { + assert!( + offsets64.len() < LARGE_OFFSET_THRESHOLD as usize, + "Encoding breakdown - way too many 64bit offsets" + ); + offsets64.push(entry.offset); + ((offsets64.len() - 1) as u32) | HIGH_BIT + } else { + entry.offset as u32 + }; + out.write_all(&offset.to_be_bytes())?; + } + for value in offsets64 { + out.write_all(&value.to_be_bytes())?; + } + } + + out.write_all(pack_hash.as_slice())?; + + let bytes_written_without_trailer = out.bytes; + let mut out = out.inner.into_inner()?; + let index_hash: gix_hash::ObjectId = out.hash.digest().into(); + out.inner.write_all(index_hash.as_slice())?; + out.inner.flush()?; + + progress.inc(); + progress.show_throughput_with( + start, + (bytes_written_without_trailer + 20) as usize, + progress::bytes().expect("unit always set"), + progress::MessageLevel::Success, + ); + + Ok(index_hash) +} + +pub(crate) fn fanout(iter: impl ExactSizeIterator<Item = u8>) -> [u32; 256] { + let mut fan_out = [0u32; 256]; + let entries_len = iter.len() as u32; + let mut iter = iter.enumerate(); + let mut idx_and_entry = iter.next(); + let mut upper_bound = 0; + + for (offset_be, byte) in fan_out.iter_mut().zip(0u8..=255) { + *offset_be = match idx_and_entry.as_ref() { + Some((_idx, first_byte)) => match first_byte.cmp(&byte) { + Ordering::Less => unreachable!("ids should be ordered, and we make sure to keep ahead with them"), + Ordering::Greater => upper_bound, + Ordering::Equal => { + if byte == 255 { + entries_len + } else { + idx_and_entry = iter.find(|(_, first_byte)| *first_byte != byte); + upper_bound = idx_and_entry + .as_ref() + .map(|(idx, _)| *idx as u32) + .unwrap_or(entries_len); + upper_bound + } + } + }, + None => entries_len, + }; + } + + fan_out +} diff --git a/vendor/gix-pack/src/index/write/error.rs b/vendor/gix-pack/src/index/write/error.rs new file mode 100644 index 000000000..a5ef6ad67 --- /dev/null +++ b/vendor/gix-pack/src/index/write/error.rs @@ -0,0 +1,25 @@ +use std::io; + +/// Returned by [`crate::index::File::write_data_iter_to_stream()`] +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum Error { + #[error("An IO error occurred when reading the pack or creating a temporary file")] + Io(#[from] io::Error), + #[error("A pack entry could not be extracted")] + PackEntryDecode(#[from] crate::data::input::Error), + #[error("Indices of type {} cannot be written, only {} are supported", *.0 as usize, crate::index::Version::default() as usize)] + Unsupported(crate::index::Version), + #[error("Ref delta objects are not supported as there is no way to look them up. Resolve them beforehand.")] + IteratorInvariantNoRefDelta, + #[error("The iterator failed to set a trailing hash over all prior pack entries in the last provided entry")] + IteratorInvariantTrailer, + #[error("Only u32::MAX objects can be stored in a pack, found {0}")] + IteratorInvariantTooManyObjects(usize), + #[error("{pack_offset} is not a valid offset for pack offset {distance}")] + IteratorInvariantBaseOffset { pack_offset: u64, distance: u64 }, + #[error(transparent)] + Tree(#[from] crate::cache::delta::Error), + #[error(transparent)] + TreeTraversal(#[from] crate::cache::delta::traverse::Error), +} diff --git a/vendor/gix-pack/src/index/write/mod.rs b/vendor/gix-pack/src/index/write/mod.rs new file mode 100644 index 000000000..c8fdaa271 --- /dev/null +++ b/vendor/gix-pack/src/index/write/mod.rs @@ -0,0 +1,263 @@ +use std::{convert::TryInto, io, sync::atomic::AtomicBool}; + +pub use error::Error; +use gix_features::progress::{self, Progress}; + +use crate::cache::delta::{traverse, Tree}; + +pub(crate) mod encode; +mod error; + +pub(crate) struct TreeEntry { + pub id: gix_hash::ObjectId, + pub crc32: u32, +} + +/// Information gathered while executing [`write_data_iter_to_stream()`][crate::index::File::write_data_iter_to_stream] +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub struct Outcome { + /// The version of the verified index + pub index_version: crate::index::Version, + /// The verified checksum of the verified index + pub index_hash: gix_hash::ObjectId, + + /// The hash of the '.pack' file, also found in its trailing bytes + pub data_hash: gix_hash::ObjectId, + /// The amount of objects that were verified, always the amount of objects in the pack. + pub num_objects: u32, +} + +/// The progress ids used in [`write_data_iter_from_stream()`][crate::index::File::write_data_iter_to_stream()]. +/// +/// Use this information to selectively extract the progress of interest in case the parent application has custom visualization. +#[derive(Debug, Copy, Clone)] +pub enum ProgressId { + /// Counts the amount of objects that were index thus far. + IndexObjects, + /// The amount of bytes that were decompressed while decoding pack entries. + /// + /// This is done to determine entry boundaries. + DecompressedBytes, + /// The amount of objects whose hashes were computed. + /// + /// This is done by decoding them, which typically involves decoding delta objects. + ResolveObjects, + /// The amount of bytes that were decoded in total, as the sum of all bytes to represent all resolved objects. + DecodedBytes, + /// The amount of bytes written to the index file. + IndexBytesWritten, +} + +impl From<ProgressId> for gix_features::progress::Id { + fn from(v: ProgressId) -> Self { + match v { + ProgressId::IndexObjects => *b"IWIO", + ProgressId::DecompressedBytes => *b"IWDB", + ProgressId::ResolveObjects => *b"IWRO", + ProgressId::DecodedBytes => *b"IWDB", + ProgressId::IndexBytesWritten => *b"IWBW", + } + } +} + +/// Various ways of writing an index file from pack entries +impl crate::index::File { + /// Write information about `entries` as obtained from a pack data file into a pack index file via the `out` stream. + /// The resolver produced by `make_resolver` must resolve pack entries from the same pack data file that produced the + /// `entries` iterator. + /// + /// * `kind` is the version of pack index to produce, use [`crate::index::Version::default()`] if in doubt. + /// * `tread_limit` is used for a parallel tree traversal for obtaining object hashes with optimal performance. + /// * `root_progress` is the top-level progress to stay informed about the progress of this potentially long-running + /// computation. + /// * `object_hash` defines what kind of object hash we write into the index file. + /// * `pack_version` is the version of the underlying pack for which `entries` are read. It's used in case none of these objects are provided + /// to compute a pack-hash. + /// + /// # Remarks + /// + /// * neither in-pack nor out-of-pack Ref Deltas are supported here, these must have been resolved beforehand. + /// * `make_resolver()` will only be called after the iterator stopped returning elements and produces a function that + /// provides all bytes belonging to a pack entry writing them to the given mutable output `Vec`. + /// It should return `None` if the entry cannot be resolved from the pack that produced the `entries` iterator, causing + /// the write operation to fail. + #[allow(clippy::too_many_arguments)] + pub fn write_data_iter_to_stream<F, F2>( + version: crate::index::Version, + make_resolver: F, + entries: impl Iterator<Item = Result<crate::data::input::Entry, crate::data::input::Error>>, + thread_limit: Option<usize>, + mut root_progress: impl Progress, + out: impl io::Write, + should_interrupt: &AtomicBool, + object_hash: gix_hash::Kind, + pack_version: crate::data::Version, + ) -> Result<Outcome, Error> + where + F: FnOnce() -> io::Result<F2>, + F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone, + { + if version != crate::index::Version::default() { + return Err(Error::Unsupported(version)); + } + let mut num_objects: usize = 0; + let mut last_seen_trailer = None; + let (anticipated_num_objects, upper_bound) = entries.size_hint(); + let worst_case_num_objects_after_thin_pack_resolution = upper_bound.unwrap_or(anticipated_num_objects); + let mut tree = Tree::with_capacity(worst_case_num_objects_after_thin_pack_resolution)?; + let indexing_start = std::time::Instant::now(); + + root_progress.init(Some(4), progress::steps()); + let mut objects_progress = root_progress.add_child_with_id("indexing", ProgressId::IndexObjects.into()); + objects_progress.init(Some(anticipated_num_objects), progress::count("objects")); + let mut decompressed_progress = + root_progress.add_child_with_id("decompressing", ProgressId::DecompressedBytes.into()); + decompressed_progress.init(None, progress::bytes()); + let mut pack_entries_end: u64 = 0; + + for entry in entries { + let crate::data::input::Entry { + header, + pack_offset, + crc32, + header_size, + compressed: _, + compressed_size, + decompressed_size, + trailer, + } = entry?; + + decompressed_progress.inc_by(decompressed_size as usize); + + let entry_len = header_size as u64 + compressed_size; + pack_entries_end = pack_offset + entry_len; + + let crc32 = crc32.expect("crc32 to be computed by the iterator. Caller assures correct configuration."); + + use crate::data::entry::Header::*; + match header { + Tree | Blob | Commit | Tag => { + tree.add_root( + pack_offset, + TreeEntry { + id: object_hash.null(), + crc32, + }, + )?; + } + RefDelta { .. } => return Err(Error::IteratorInvariantNoRefDelta), + OfsDelta { base_distance } => { + let base_pack_offset = + crate::data::entry::Header::verified_base_pack_offset(pack_offset, base_distance).ok_or( + Error::IteratorInvariantBaseOffset { + pack_offset, + distance: base_distance, + }, + )?; + tree.add_child( + base_pack_offset, + pack_offset, + TreeEntry { + id: object_hash.null(), + crc32, + }, + )?; + } + }; + last_seen_trailer = trailer; + num_objects += 1; + objects_progress.inc(); + } + let num_objects: u32 = num_objects + .try_into() + .map_err(|_| Error::IteratorInvariantTooManyObjects(num_objects))?; + + objects_progress.show_throughput(indexing_start); + decompressed_progress.show_throughput(indexing_start); + drop(objects_progress); + drop(decompressed_progress); + + root_progress.inc(); + + let resolver = make_resolver()?; + let sorted_pack_offsets_by_oid = { + let traverse::Outcome { roots, children } = tree.traverse( + resolver, + pack_entries_end, + || (), + |data, + _progress, + traverse::Context { + entry, + decompressed: bytes, + .. + }| { + modify_base(data, entry, bytes, version.hash()); + Ok::<_, Error>(()) + }, + traverse::Options { + object_progress: root_progress.add_child_with_id("Resolving", ProgressId::ResolveObjects.into()), + size_progress: root_progress.add_child_with_id("Decoding", ProgressId::DecodedBytes.into()), + thread_limit, + should_interrupt, + object_hash, + }, + )?; + root_progress.inc(); + + let mut items = roots; + items.extend(children); + { + let _progress = root_progress.add_child_with_id("sorting by id", gix_features::progress::UNKNOWN); + items.sort_by_key(|e| e.data.id); + } + + root_progress.inc(); + items + }; + + let pack_hash = match last_seen_trailer { + Some(ph) => ph, + None if num_objects == 0 => { + let header = crate::data::header::encode(pack_version, 0); + let mut hasher = gix_features::hash::hasher(object_hash); + hasher.update(&header); + gix_hash::ObjectId::from(hasher.digest()) + } + None => return Err(Error::IteratorInvariantTrailer), + }; + let index_hash = encode::write_to( + out, + sorted_pack_offsets_by_oid, + &pack_hash, + version, + root_progress.add_child_with_id("writing index file", ProgressId::IndexBytesWritten.into()), + )?; + root_progress.show_throughput_with( + indexing_start, + num_objects as usize, + progress::count("objects").expect("unit always set"), + progress::MessageLevel::Success, + ); + Ok(Outcome { + index_version: version, + index_hash, + data_hash: pack_hash, + num_objects, + }) + } +} + +fn modify_base(entry: &mut TreeEntry, pack_entry: &crate::data::Entry, decompressed: &[u8], hash: gix_hash::Kind) { + fn compute_hash(kind: gix_object::Kind, bytes: &[u8], object_hash: gix_hash::Kind) -> gix_hash::ObjectId { + let mut hasher = gix_features::hash::hasher(object_hash); + hasher.update(&gix_object::encode::loose_header(kind, bytes.len())); + hasher.update(bytes); + gix_hash::ObjectId::from(hasher.digest()) + } + + let object_kind = pack_entry.header.as_kind().expect("base object as source of iteration"); + let id = compute_hash(object_kind, decompressed, hash); + entry.id = id; +} |