import os.path import re from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from typing import Sequence from typing import Tuple import pre_commit.constants as C from pre_commit import git from pre_commit import output from pre_commit.clientlib import InvalidManifestError from pre_commit.clientlib import load_config from pre_commit.clientlib import load_manifest from pre_commit.clientlib import LOCAL from pre_commit.clientlib import META from pre_commit.commands.migrate_config import migrate_config from pre_commit.store import Store from pre_commit.util import CalledProcessError from pre_commit.util import cmd_output from pre_commit.util import cmd_output_b from pre_commit.util import tmpdir from pre_commit.util import yaml_dump from pre_commit.util import yaml_load class RevInfo(NamedTuple): repo: str rev: str frozen: Optional[str] @classmethod def from_config(cls, config: Dict[str, Any]) -> 'RevInfo': return cls(config['repo'], config['rev'], None) def update(self, tags_only: bool, freeze: bool) -> 'RevInfo': if tags_only: tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--abbrev=0') else: tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--exact') with tmpdir() as tmp: git.init_repo(tmp, self.repo) cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=tmp) try: rev = cmd_output(*tag_cmd, cwd=tmp)[1].strip() except CalledProcessError: cmd = ('git', 'rev-parse', 'FETCH_HEAD') rev = cmd_output(*cmd, cwd=tmp)[1].strip() frozen = None if freeze: exact = cmd_output('git', 'rev-parse', rev, cwd=tmp)[1].strip() if exact != rev: rev, frozen = exact, rev return self._replace(rev=rev, frozen=frozen) class RepositoryCannotBeUpdatedError(RuntimeError): pass def _check_hooks_still_exist_at_rev( repo_config: Dict[str, Any], info: RevInfo, store: Store, ) -> None: try: path = store.clone(repo_config['repo'], info.rev) manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE)) except InvalidManifestError as e: raise RepositoryCannotBeUpdatedError(str(e)) # See if any of our hooks were deleted with the new commits hooks = {hook['id'] for hook in repo_config['hooks']} hooks_missing = hooks - {hook['id'] for hook in manifest} if hooks_missing: raise RepositoryCannotBeUpdatedError( f'Cannot update because the tip of HEAD is missing these hooks:\n' f'{", ".join(sorted(hooks_missing))}', ) REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)(\r?\n)$', re.DOTALL) def _original_lines( path: str, rev_infos: List[Optional[RevInfo]], retry: bool = False, ) -> Tuple[List[str], List[int]]: """detect `rev:` lines or reformat the file""" with open(path, newline='') as f: original = f.read() lines = original.splitlines(True) idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)] if len(idxs) == len(rev_infos): return lines, idxs elif retry: raise AssertionError('could not find rev lines') else: with open(path, 'w') as f: f.write(yaml_dump(yaml_load(original))) return _original_lines(path, rev_infos, retry=True) def _write_new_config(path: str, rev_infos: List[Optional[RevInfo]]) -> None: lines, idxs = _original_lines(path, rev_infos) for idx, rev_info in zip(idxs, rev_infos): if rev_info is None: continue match = REV_LINE_RE.match(lines[idx]) assert match is not None new_rev_s = yaml_dump({'rev': rev_info.rev}) new_rev = new_rev_s.split(':', 1)[1].strip() if rev_info.frozen is not None: comment = f' # frozen: {rev_info.frozen}' elif match[4].strip().startswith('# frozen:'): comment = '' else: comment = match[4] lines[idx] = f'{match[1]}rev:{match[2]}{new_rev}{comment}{match[5]}' with open(path, 'w', newline='') as f: f.write(''.join(lines)) def autoupdate( config_file: str, store: Store, tags_only: bool, freeze: bool, repos: Sequence[str] = (), ) -> int: """Auto-update the pre-commit config to the latest versions of repos.""" migrate_config(config_file, quiet=True) retv = 0 rev_infos: List[Optional[RevInfo]] = [] changed = False config = load_config(config_file) for repo_config in config['repos']: if repo_config['repo'] in {LOCAL, META}: continue info = RevInfo.from_config(repo_config) if repos and info.repo not in repos: rev_infos.append(None) continue output.write(f'Updating {info.repo} ... ') new_info = info.update(tags_only=tags_only, freeze=freeze) try: _check_hooks_still_exist_at_rev(repo_config, new_info, store) except RepositoryCannotBeUpdatedError as error: output.write_line(error.args[0]) rev_infos.append(None) retv = 1 continue if new_info.rev != info.rev: changed = True if new_info.frozen: updated_to = f'{new_info.frozen} (frozen)' else: updated_to = new_info.rev msg = f'updating {info.rev} -> {updated_to}.' output.write_line(msg) rev_infos.append(new_info) else: output.write_line('already up to date.') rev_infos.append(None) if changed: _write_new_config(config_file, rev_infos) return retv