summaryrefslogtreecommitdiffstats
path: root/pre_commit/commands/autoupdate.py
diff options
context:
space:
mode:
Diffstat (limited to 'pre_commit/commands/autoupdate.py')
-rw-r--r--pre_commit/commands/autoupdate.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/pre_commit/commands/autoupdate.py b/pre_commit/commands/autoupdate.py
new file mode 100644
index 0000000..5a9a988
--- /dev/null
+++ b/pre_commit/commands/autoupdate.py
@@ -0,0 +1,182 @@
+import os.path
+import re
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
+import pre_commit.constants as C
+from pre_commit import git
+from pre_commit import output
+from pre_commit.clientlib import InvalidManifestError
+from pre_commit.clientlib import load_config
+from pre_commit.clientlib import load_manifest
+from pre_commit.clientlib import LOCAL
+from pre_commit.clientlib import META
+from pre_commit.commands.migrate_config import migrate_config
+from pre_commit.store import Store
+from pre_commit.util import CalledProcessError
+from pre_commit.util import cmd_output
+from pre_commit.util import cmd_output_b
+from pre_commit.util import tmpdir
+from pre_commit.util import yaml_dump
+from pre_commit.util import yaml_load
+
+
+class RevInfo(NamedTuple):
+ repo: str
+ rev: str
+ frozen: Optional[str]
+
+ @classmethod
+ def from_config(cls, config: Dict[str, Any]) -> 'RevInfo':
+ return cls(config['repo'], config['rev'], None)
+
+ def update(self, tags_only: bool, freeze: bool) -> 'RevInfo':
+ if tags_only:
+ tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--abbrev=0')
+ else:
+ tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--exact')
+
+ with tmpdir() as tmp:
+ git.init_repo(tmp, self.repo)
+ cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=tmp)
+
+ try:
+ rev = cmd_output(*tag_cmd, cwd=tmp)[1].strip()
+ except CalledProcessError:
+ cmd = ('git', 'rev-parse', 'FETCH_HEAD')
+ rev = cmd_output(*cmd, cwd=tmp)[1].strip()
+
+ frozen = None
+ if freeze:
+ exact = cmd_output('git', 'rev-parse', rev, cwd=tmp)[1].strip()
+ if exact != rev:
+ rev, frozen = exact, rev
+ return self._replace(rev=rev, frozen=frozen)
+
+
+class RepositoryCannotBeUpdatedError(RuntimeError):
+ pass
+
+
+def _check_hooks_still_exist_at_rev(
+ repo_config: Dict[str, Any],
+ info: RevInfo,
+ store: Store,
+) -> None:
+ try:
+ path = store.clone(repo_config['repo'], info.rev)
+ manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE))
+ except InvalidManifestError as e:
+ raise RepositoryCannotBeUpdatedError(str(e))
+
+ # See if any of our hooks were deleted with the new commits
+ hooks = {hook['id'] for hook in repo_config['hooks']}
+ hooks_missing = hooks - {hook['id'] for hook in manifest}
+ if hooks_missing:
+ raise RepositoryCannotBeUpdatedError(
+ f'Cannot update because the tip of HEAD is missing these hooks:\n'
+ f'{", ".join(sorted(hooks_missing))}',
+ )
+
+
+REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)(\r?\n)$', re.DOTALL)
+
+
+def _original_lines(
+ path: str,
+ rev_infos: List[Optional[RevInfo]],
+ retry: bool = False,
+) -> Tuple[List[str], List[int]]:
+ """detect `rev:` lines or reformat the file"""
+ with open(path) as f:
+ original = f.read()
+
+ lines = original.splitlines(True)
+ idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)]
+ if len(idxs) == len(rev_infos):
+ return lines, idxs
+ elif retry:
+ raise AssertionError('could not find rev lines')
+ else:
+ with open(path, 'w') as f:
+ f.write(yaml_dump(yaml_load(original)))
+ return _original_lines(path, rev_infos, retry=True)
+
+
+def _write_new_config(path: str, rev_infos: List[Optional[RevInfo]]) -> None:
+ lines, idxs = _original_lines(path, rev_infos)
+
+ for idx, rev_info in zip(idxs, rev_infos):
+ if rev_info is None:
+ continue
+ match = REV_LINE_RE.match(lines[idx])
+ assert match is not None
+ new_rev_s = yaml_dump({'rev': rev_info.rev})
+ new_rev = new_rev_s.split(':', 1)[1].strip()
+ if rev_info.frozen is not None:
+ comment = f' # frozen: {rev_info.frozen}'
+ elif match[4].strip().startswith('# frozen:'):
+ comment = ''
+ else:
+ comment = match[4]
+ lines[idx] = f'{match[1]}rev:{match[2]}{new_rev}{comment}{match[5]}'
+
+ with open(path, 'w') as f:
+ f.write(''.join(lines))
+
+
+def autoupdate(
+ config_file: str,
+ store: Store,
+ tags_only: bool,
+ freeze: bool,
+ repos: Sequence[str] = (),
+) -> int:
+ """Auto-update the pre-commit config to the latest versions of repos."""
+ migrate_config(config_file, quiet=True)
+ retv = 0
+ rev_infos: List[Optional[RevInfo]] = []
+ changed = False
+
+ config = load_config(config_file)
+ for repo_config in config['repos']:
+ if repo_config['repo'] in {LOCAL, META}:
+ continue
+
+ info = RevInfo.from_config(repo_config)
+ if repos and info.repo not in repos:
+ rev_infos.append(None)
+ continue
+
+ output.write(f'Updating {info.repo} ... ')
+ new_info = info.update(tags_only=tags_only, freeze=freeze)
+ try:
+ _check_hooks_still_exist_at_rev(repo_config, new_info, store)
+ except RepositoryCannotBeUpdatedError as error:
+ output.write_line(error.args[0])
+ rev_infos.append(None)
+ retv = 1
+ continue
+
+ if new_info.rev != info.rev:
+ changed = True
+ if new_info.frozen:
+ updated_to = f'{new_info.frozen} (frozen)'
+ else:
+ updated_to = new_info.rev
+ msg = f'updating {info.rev} -> {updated_to}.'
+ output.write_line(msg)
+ rev_infos.append(new_info)
+ else:
+ output.write_line('already up to date.')
+ rev_infos.append(None)
+
+ if changed:
+ _write_new_config(config_file, rev_infos)
+
+ return retv