summaryrefslogtreecommitdiffstats
path: root/third_party/python/fluent.migrate/fluent/migrate/repo_client.py
blob: 4236bc42862e7b7801498d7c7bf3aa8c71d5a1c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from __future__ import annotations
from typing import Tuple

import json
from subprocess import run

from os.path import isdir, join

import hglib


def git(root: str, *args: str) -> str:
    """
    Wrapper for calling command-line git in the `root` directory.
    Raises an exception on any error, including a non-0 return code.
    Returns the command's stdout as a string.
    """
    git = ["git"]
    git.extend(args)
    proc = run(git, capture_output=True, cwd=root, encoding="utf-8")
    if proc.returncode != 0:
        raise Exception(proc.stderr or f"git command failed: {args}")
    return proc.stdout


class RepoClient:
    def __init__(self, root: str):
        self.root = root
        if isdir(join(root, ".hg")):
            self.hgclient = hglib.open(root, "utf-8")
        elif isdir(join(root, ".git")):
            self.hgclient = None
            stdout = git(self.root, "rev-parse", "--is-inside-work-tree")
            if stdout != "true\n":
                raise Exception("git rev-parse failed")
        else:
            raise Exception(f"Unsupported repository: {root}")

    def close(self):
        if self.hgclient:
            self.hgclient.close()

    def blame(self, file: str) -> list[Tuple[str, int]]:
        "Return a list of (author, time) tuples for each line in `file`."
        if self.hgclient:
            args = hglib.util.cmdbuilder(
                b"annotate",
                file.encode("latin-1"),
                template="json",
                date=True,
                user=True,
                cwd=self.root,
            )
            blame_json = self.hgclient.rawcommand(args)
            return [
                (line["user"], int(line["date"][0]))
                for line in json.loads(blame_json)[0]["lines"]
            ]
        else:
            lines: list[Tuple[str, int]] = []
            user = ""
            time = 0
            stdout = git(self.root, "blame", "--porcelain", file)
            for line in stdout.splitlines():
                if line.startswith("author "):
                    user = line[7:]
                elif line.startswith("author-mail "):
                    user += line[11:]  # includes leading space
                elif line.startswith("author-time "):
                    time = int(line[12:])
                elif line.startswith("\t"):
                    lines.append((user, time))
            return lines

    def commit(self, message: str, author: str):
        "Add and commit all work tree files"
        if self.hgclient:
            self.hgclient.commit(message, user=author.encode("utf-8"), addremove=True)
        else:
            git(self.root, "add", ".")
            git(self.root, "commit", f"--author={author}", f"--message={message}")

    def head(self) -> str:
        "Identifier for the most recent commit"
        if self.hgclient:
            return self.hgclient.tip().node.decode("utf-8")
        else:
            return git(self.root, "rev-parse", "HEAD").strip()

    def log(self, from_commit: str, to_commit: str) -> list[str]:
        if self.hgclient:
            return [
                rev.desc.decode("utf-8")
                for rev in self.hgclient.log(f"{to_commit} % {from_commit}")
            ]
        else:
            return (
                git(
                    self.root,
                    "log",
                    "--pretty=format:%s",
                    f"{from_commit}..{to_commit}",
                )
                .strip()
                .splitlines()
            )