summaryrefslogtreecommitdiffstats
path: root/third_party/python/fluent.migrate/fluent/migrate/blame.py
blob: 7ea505edaf54ff069520078171e062b62d122167 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from __future__ import annotations
from typing import Dict, Iterable, Tuple, TypedDict, cast

import argparse
import json
from os.path import join

from compare_locales.parser import Junk, getParser
from compare_locales.parser.fluent import FluentEntity

from .repo_client import RepoClient

BlameData = Dict[str, Dict[str, Tuple[int, float]]]
"File path -> message key -> [userid, timestamp]"


class BlameResult(TypedDict):
    authors: list[str]
    blame: BlameData


class Blame:
    def __init__(self, client: RepoClient):
        self.client = client
        self.users: list[str] = []
        self.blame: BlameData = {}

    def attribution(self, file_paths: Iterable[str]) -> BlameResult:
        for file in file_paths:
            blame = self.client.blame(file)
            self.handleFile(file, blame)
        return {"authors": self.users, "blame": self.blame}

    def handleFile(self, path: str, file_blame: list[Tuple[str, int]]):
        try:
            parser = getParser(path)
        except UserWarning:
            return

        self.blame[path] = {}

        self.readFile(parser, path)
        entities = parser.parse()
        for e in entities:
            if isinstance(e, Junk):
                continue
            if e.val_span:
                key_vals: list[tuple[str, str]] = [(e.key, e.val_span)]
            else:
                key_vals = []
            if isinstance(e, FluentEntity):
                key_vals += [
                    (f"{e.key}.{attr.key}", cast(str, attr.val_span))
                    for attr in e.attributes
                ]
            for key, (val_start, val_end) in key_vals:
                entity_lines = file_blame[
                    (e.ctx.linecol(val_start)[0] - 1) : e.ctx.linecol(val_end)[0]
                ]
                user, timestamp = max(entity_lines, key=lambda x: x[1])
                if user not in self.users:
                    self.users.append(user)
                userid = self.users.index(user)
                self.blame[path][key] = (userid, timestamp)

    def readFile(self, parser, path: str):
        parser.readFile(join(self.client.root, path))


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("repo_path")
    parser.add_argument("file_path", nargs="+")
    args = parser.parse_args()
    blame = Blame(RepoClient(args.repo_path))
    attrib = blame.attribution(args.file_path)
    print(json.dumps(attrib, indent=4, separators=(",", ": ")))