1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Instant,
};
use gix_features::{
parallel,
progress::Progress,
threading::{lock, Mutable, OwnShared},
};
use crate::{data, index::traverse};
fn add_decode_result(lhs: &mut data::decode::entry::Outcome, rhs: data::decode::entry::Outcome) {
lhs.num_deltas += rhs.num_deltas;
lhs.decompressed_size += rhs.decompressed_size;
lhs.compressed_size += rhs.compressed_size;
lhs.object_size += rhs.object_size;
}
fn div_decode_result(lhs: &mut data::decode::entry::Outcome, div: usize) {
if div != 0 {
lhs.num_deltas = (lhs.num_deltas as f32 / div as f32) as u32;
lhs.decompressed_size /= div as u64;
lhs.compressed_size /= div;
lhs.object_size /= div as u64;
}
}
pub struct Reducer<'a, P, E> {
progress: OwnShared<Mutable<P>>,
check: traverse::SafetyCheck,
then: Instant,
entries_seen: usize,
stats: traverse::Statistics,
should_interrupt: &'a AtomicBool,
_error: std::marker::PhantomData<E>,
}
impl<'a, P, E> Reducer<'a, P, E>
where
P: Progress,
{
pub fn from_progress(
progress: OwnShared<Mutable<P>>,
pack_data_len_in_bytes: usize,
check: traverse::SafetyCheck,
should_interrupt: &'a AtomicBool,
) -> Self {
let stats = traverse::Statistics {
pack_size: pack_data_len_in_bytes as u64,
..Default::default()
};
Reducer {
progress,
check,
then: Instant::now(),
entries_seen: 0,
should_interrupt,
stats,
_error: Default::default(),
}
}
}
impl<'a, P, E> parallel::Reduce for Reducer<'a, P, E>
where
P: Progress,
E: std::error::Error + Send + Sync + 'static,
{
type Input = Result<Vec<data::decode::entry::Outcome>, traverse::Error<E>>;
type FeedProduce = ();
type Output = traverse::Statistics;
type Error = traverse::Error<E>;
fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> {
let chunk_stats: Vec<_> = match input {
Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => {
lock(&self.progress).info(format!("Ignoring decode error: {err}"));
return Ok(());
}
res => res,
}?;
self.entries_seen += chunk_stats.len();
let chunk_total = chunk_stats.into_iter().fold(
data::decode::entry::Outcome::default_from_kind(gix_object::Kind::Tree),
|mut total, stats| {
*self.stats.objects_per_chain_length.entry(stats.num_deltas).or_insert(0) += 1;
self.stats.total_decompressed_entries_size += stats.decompressed_size;
self.stats.total_compressed_entries_size += stats.compressed_size as u64;
self.stats.total_object_size += stats.object_size;
use gix_object::Kind::*;
match stats.kind {
Commit => self.stats.num_commits += 1,
Tree => self.stats.num_trees += 1,
Blob => self.stats.num_blobs += 1,
Tag => self.stats.num_tags += 1,
}
add_decode_result(&mut total, stats);
total
},
);
add_decode_result(&mut self.stats.average, chunk_total);
lock(&self.progress).set(self.entries_seen);
if self.should_interrupt.load(Ordering::SeqCst) {
return Err(Self::Error::Interrupted);
}
Ok(())
}
fn finalize(mut self) -> Result<Self::Output, Self::Error> {
div_decode_result(&mut self.stats.average, self.entries_seen);
let elapsed_s = self.then.elapsed().as_secs_f32();
let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32;
lock(&self.progress).info(format!(
"of {} objects done in {:.2}s ({} objects/s, ~{}/s)",
self.entries_seen,
elapsed_s,
objects_per_second,
gix_features::progress::bytesize::ByteSize(self.stats.average.object_size * objects_per_second as u64)
));
Ok(self.stats)
}
}
|