use std::io::Write; use gix_features::hash; use crate::data::output; /// The error returned by `next()` in the [`FromEntriesIter`] iterator. #[allow(missing_docs)] #[derive(Debug, thiserror::Error)] pub enum Error where E: std::error::Error + 'static, { #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] Input(E), } /// An implementation of [`Iterator`] to write [encoded entries][output::Entry] to an inner implementation each time /// `next()` is called. pub struct FromEntriesIter { /// An iterator for input [`output::Entry`] instances pub input: I, /// A way of writing encoded bytes. output: hash::Write, /// Our trailing hash when done writing all input entries trailer: Option, /// The amount of objects in the iteration and the version of the packfile to be written. /// Will be `None` to signal the header was written already. header_info: Option<(crate::data::Version, u32)>, /// The pack data version with which pack entries should be written. entry_version: crate::data::Version, /// The amount of written bytes thus far written: u64, /// Required to quickly find offsets by object IDs, as future objects may refer to those in the past to become a delta offset base. /// It stores the pack offsets at which objects begin. /// Additionally we store if an object was invalid, and if so we will not write it nor will we allow delta objects to it. pack_offsets_and_validity: Vec<(u64, bool)>, /// If we are done, no additional writes will occur is_done: bool, } impl FromEntriesIter where I: Iterator, E>>, W: std::io::Write, E: std::error::Error + 'static, { /// Create a new instance reading [entries][output::Entry] from an `input` iterator and write pack data bytes to /// `output` writer, resembling a pack of `version` with exactly `num_entries` amount of objects contained in it. /// `object_hash` is the kind of hash to use for the pack checksum and maybe other places, depending on the version. /// /// The input chunks are expected to be sorted already. You can use the [`InOrderIter`][gix_features::parallel::InOrderIter] to assure /// this happens on the fly holding entire chunks in memory as long as needed for them to be dispensed in order. /// /// # Panics /// /// Not all combinations of `object_hash` and `version` are supported currently triggering assertion errors. pub fn new( input: I, output: W, num_entries: u32, version: crate::data::Version, object_hash: gix_hash::Kind, ) -> Self { assert!( matches!(version, crate::data::Version::V2), "currently only pack version 2 can be written", ); FromEntriesIter { input, output: hash::Write::new(output, object_hash), trailer: None, entry_version: version, pack_offsets_and_validity: Vec::with_capacity(num_entries as usize), written: 0, header_info: Some((version, num_entries)), is_done: false, } } /// Consume this instance and return the `output` implementation. /// /// _Note_ that the `input` iterator can be moved out of this instance beforehand. pub fn into_write(self) -> W { self.output.inner } /// Returns the trailing hash over all written entries once done. /// It's `None` if we are not yet done writing. pub fn digest(&self) -> Option { self.trailer } fn next_inner(&mut self) -> Result> { let previous_written = self.written; if let Some((version, num_entries)) = self.header_info.take() { let header_bytes = crate::data::header::encode(version, num_entries); self.output.write_all(&header_bytes[..])?; self.written += header_bytes.len() as u64; } match self.input.next() { Some(entries) => { for entry in entries.map_err(Error::Input)? { if entry.is_invalid() { self.pack_offsets_and_validity.push((0, false)); continue; }; self.pack_offsets_and_validity.push((self.written, true)); let header = entry.to_entry_header(self.entry_version, |index| { let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index]; if !is_valid_object { unreachable!("if you see this the object database is correct as a delta refers to a non-existing object") } self.written - base_offset }); self.written += header.write_to(entry.decompressed_size as u64, &mut self.output)? as u64; self.written += std::io::copy(&mut &*entry.compressed_data, &mut self.output)?; } } None => { let digest = self.output.hash.clone().digest(); self.output.inner.write_all(&digest[..])?; self.written += digest.len() as u64; self.output.inner.flush()?; self.is_done = true; self.trailer = Some(gix_hash::ObjectId::from(digest)); } }; Ok(self.written - previous_written) } } impl Iterator for FromEntriesIter where I: Iterator, E>>, W: std::io::Write, E: std::error::Error + 'static, { /// The amount of bytes written to `out` if `Ok` or the error `E` received from the input. type Item = Result>; fn next(&mut self) -> Option { if self.is_done { return None; } Some(match self.next_inner() { Err(err) => { self.is_done = true; Err(err) } Ok(written) => Ok(written), }) } }