summaryrefslogtreecommitdiffstats
path: root/vendor/gix-pack/src/index/traverse/reduce.rs
blob: e05341242f5ec7e837d633033f27d35064688144 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::{
    sync::atomic::{AtomicBool, Ordering},
    time::Instant,
};

use gix_features::{
    parallel,
    progress::Progress,
    threading::{lock, Mutable, OwnShared},
};

use crate::{data, index::traverse};

fn add_decode_result(lhs: &mut data::decode::entry::Outcome, rhs: data::decode::entry::Outcome) {
    lhs.num_deltas += rhs.num_deltas;
    lhs.decompressed_size += rhs.decompressed_size;
    lhs.compressed_size += rhs.compressed_size;
    lhs.object_size += rhs.object_size;
}

fn div_decode_result(lhs: &mut data::decode::entry::Outcome, div: usize) {
    if div != 0 {
        lhs.num_deltas = (lhs.num_deltas as f32 / div as f32) as u32;
        lhs.decompressed_size /= div as u64;
        lhs.compressed_size /= div;
        lhs.object_size /= div as u64;
    }
}

pub struct Reducer<'a, P, E> {
    progress: OwnShared<Mutable<P>>,
    check: traverse::SafetyCheck,
    then: Instant,
    entries_seen: usize,
    stats: traverse::Statistics,
    should_interrupt: &'a AtomicBool,
    _error: std::marker::PhantomData<E>,
}

impl<'a, P, E> Reducer<'a, P, E>
where
    P: Progress,
{
    pub fn from_progress(
        progress: OwnShared<Mutable<P>>,
        pack_data_len_in_bytes: usize,
        check: traverse::SafetyCheck,
        should_interrupt: &'a AtomicBool,
    ) -> Self {
        let stats = traverse::Statistics {
            pack_size: pack_data_len_in_bytes as u64,
            ..Default::default()
        };
        Reducer {
            progress,
            check,
            then: Instant::now(),
            entries_seen: 0,
            should_interrupt,
            stats,
            _error: Default::default(),
        }
    }
}

impl<'a, P, E> parallel::Reduce for Reducer<'a, P, E>
where
    P: Progress,
    E: std::error::Error + Send + Sync + 'static,
{
    type Input = Result<Vec<data::decode::entry::Outcome>, traverse::Error<E>>;
    type FeedProduce = ();
    type Output = traverse::Statistics;
    type Error = traverse::Error<E>;

    fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> {
        let chunk_stats: Vec<_> = match input {
            Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => {
                lock(&self.progress).info(format!("Ignoring decode error: {err}"));
                return Ok(());
            }
            res => res,
        }?;
        self.entries_seen += chunk_stats.len();

        let chunk_total = chunk_stats.into_iter().fold(
            data::decode::entry::Outcome::default_from_kind(gix_object::Kind::Tree),
            |mut total, stats| {
                *self.stats.objects_per_chain_length.entry(stats.num_deltas).or_insert(0) += 1;
                self.stats.total_decompressed_entries_size += stats.decompressed_size;
                self.stats.total_compressed_entries_size += stats.compressed_size as u64;
                self.stats.total_object_size += stats.object_size;
                use gix_object::Kind::*;
                match stats.kind {
                    Commit => self.stats.num_commits += 1,
                    Tree => self.stats.num_trees += 1,
                    Blob => self.stats.num_blobs += 1,
                    Tag => self.stats.num_tags += 1,
                }
                add_decode_result(&mut total, stats);
                total
            },
        );

        add_decode_result(&mut self.stats.average, chunk_total);
        lock(&self.progress).set(self.entries_seen);

        if self.should_interrupt.load(Ordering::SeqCst) {
            return Err(Self::Error::Interrupted);
        }
        Ok(())
    }

    fn finalize(mut self) -> Result<Self::Output, Self::Error> {
        div_decode_result(&mut self.stats.average, self.entries_seen);

        let elapsed_s = self.then.elapsed().as_secs_f32();
        let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32;

        lock(&self.progress).info(format!(
            "of {} objects done in {:.2}s ({} objects/s, ~{}/s)",
            self.entries_seen,
            elapsed_s,
            objects_per_second,
            gix_features::progress::bytesize::ByteSize(self.stats.average.object_size * objects_per_second as u64)
        ));
        Ok(self.stats)
    }
}