diff options
Diffstat (limited to 'pre_commit/commands/autoupdate.py')
-rw-r--r-- | pre_commit/commands/autoupdate.py | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/pre_commit/commands/autoupdate.py b/pre_commit/commands/autoupdate.py new file mode 100644 index 0000000..5a9a988 --- /dev/null +++ b/pre_commit/commands/autoupdate.py @@ -0,0 +1,182 @@ +import os.path +import re +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Sequence +from typing import Tuple + +import pre_commit.constants as C +from pre_commit import git +from pre_commit import output +from pre_commit.clientlib import InvalidManifestError +from pre_commit.clientlib import load_config +from pre_commit.clientlib import load_manifest +from pre_commit.clientlib import LOCAL +from pre_commit.clientlib import META +from pre_commit.commands.migrate_config import migrate_config +from pre_commit.store import Store +from pre_commit.util import CalledProcessError +from pre_commit.util import cmd_output +from pre_commit.util import cmd_output_b +from pre_commit.util import tmpdir +from pre_commit.util import yaml_dump +from pre_commit.util import yaml_load + + +class RevInfo(NamedTuple): + repo: str + rev: str + frozen: Optional[str] + + @classmethod + def from_config(cls, config: Dict[str, Any]) -> 'RevInfo': + return cls(config['repo'], config['rev'], None) + + def update(self, tags_only: bool, freeze: bool) -> 'RevInfo': + if tags_only: + tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--abbrev=0') + else: + tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--exact') + + with tmpdir() as tmp: + git.init_repo(tmp, self.repo) + cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=tmp) + + try: + rev = cmd_output(*tag_cmd, cwd=tmp)[1].strip() + except CalledProcessError: + cmd = ('git', 'rev-parse', 'FETCH_HEAD') + rev = cmd_output(*cmd, cwd=tmp)[1].strip() + + frozen = None + if freeze: + exact = cmd_output('git', 'rev-parse', rev, cwd=tmp)[1].strip() + if exact != rev: + rev, frozen = exact, rev + return self._replace(rev=rev, frozen=frozen) + + +class RepositoryCannotBeUpdatedError(RuntimeError): + pass + + +def _check_hooks_still_exist_at_rev( + repo_config: Dict[str, Any], + info: RevInfo, + store: Store, +) -> None: + try: + path = store.clone(repo_config['repo'], info.rev) + manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE)) + except InvalidManifestError as e: + raise RepositoryCannotBeUpdatedError(str(e)) + + # See if any of our hooks were deleted with the new commits + hooks = {hook['id'] for hook in repo_config['hooks']} + hooks_missing = hooks - {hook['id'] for hook in manifest} + if hooks_missing: + raise RepositoryCannotBeUpdatedError( + f'Cannot update because the tip of HEAD is missing these hooks:\n' + f'{", ".join(sorted(hooks_missing))}', + ) + + +REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)(\r?\n)$', re.DOTALL) + + +def _original_lines( + path: str, + rev_infos: List[Optional[RevInfo]], + retry: bool = False, +) -> Tuple[List[str], List[int]]: + """detect `rev:` lines or reformat the file""" + with open(path) as f: + original = f.read() + + lines = original.splitlines(True) + idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)] + if len(idxs) == len(rev_infos): + return lines, idxs + elif retry: + raise AssertionError('could not find rev lines') + else: + with open(path, 'w') as f: + f.write(yaml_dump(yaml_load(original))) + return _original_lines(path, rev_infos, retry=True) + + +def _write_new_config(path: str, rev_infos: List[Optional[RevInfo]]) -> None: + lines, idxs = _original_lines(path, rev_infos) + + for idx, rev_info in zip(idxs, rev_infos): + if rev_info is None: + continue + match = REV_LINE_RE.match(lines[idx]) + assert match is not None + new_rev_s = yaml_dump({'rev': rev_info.rev}) + new_rev = new_rev_s.split(':', 1)[1].strip() + if rev_info.frozen is not None: + comment = f' # frozen: {rev_info.frozen}' + elif match[4].strip().startswith('# frozen:'): + comment = '' + else: + comment = match[4] + lines[idx] = f'{match[1]}rev:{match[2]}{new_rev}{comment}{match[5]}' + + with open(path, 'w') as f: + f.write(''.join(lines)) + + +def autoupdate( + config_file: str, + store: Store, + tags_only: bool, + freeze: bool, + repos: Sequence[str] = (), +) -> int: + """Auto-update the pre-commit config to the latest versions of repos.""" + migrate_config(config_file, quiet=True) + retv = 0 + rev_infos: List[Optional[RevInfo]] = [] + changed = False + + config = load_config(config_file) + for repo_config in config['repos']: + if repo_config['repo'] in {LOCAL, META}: + continue + + info = RevInfo.from_config(repo_config) + if repos and info.repo not in repos: + rev_infos.append(None) + continue + + output.write(f'Updating {info.repo} ... ') + new_info = info.update(tags_only=tags_only, freeze=freeze) + try: + _check_hooks_still_exist_at_rev(repo_config, new_info, store) + except RepositoryCannotBeUpdatedError as error: + output.write_line(error.args[0]) + rev_infos.append(None) + retv = 1 + continue + + if new_info.rev != info.rev: + changed = True + if new_info.frozen: + updated_to = f'{new_info.frozen} (frozen)' + else: + updated_to = new_info.rev + msg = f'updating {info.rev} -> {updated_to}.' + output.write_line(msg) + rev_infos.append(new_info) + else: + output.write_line('already up to date.') + rev_infos.append(None) + + if changed: + _write_new_config(config_file, rev_infos) + + return retv |