summaryrefslogtreecommitdiffstats
path: root/pre_commit/repository.py
blob: a9461ab63534610fc898cc8d5ff4b0fa684dbc27 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
from __future__ import annotations

import json
import logging
import os
from collections.abc import Sequence
from typing import Any

import pre_commit.constants as C
from pre_commit.all_languages import languages
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import META
from pre_commit.hook import Hook
from pre_commit.lang_base import environment_dir
from pre_commit.prefix import Prefix
from pre_commit.store import Store
from pre_commit.util import clean_path_on_failure
from pre_commit.util import rmtree


logger = logging.getLogger('pre_commit')


def _state_filename_v1(venv: str) -> str:
    return os.path.join(venv, '.install_state_v1')


def _state_filename_v2(venv: str) -> str:
    return os.path.join(venv, '.install_state_v2')


def _state(additional_deps: Sequence[str]) -> object:
    return {'additional_dependencies': additional_deps}


def _read_state(venv: str) -> object | None:
    filename = _state_filename_v1(venv)
    if not os.path.exists(filename):
        return None
    else:
        with open(filename) as f:
            return json.load(f)


def _hook_installed(hook: Hook) -> bool:
    lang = languages[hook.language]
    if lang.ENVIRONMENT_DIR is None:
        return True

    venv = environment_dir(
        hook.prefix,
        lang.ENVIRONMENT_DIR,
        hook.language_version,
    )
    return (
        (
            os.path.exists(_state_filename_v2(venv)) or
            _read_state(venv) == _state(hook.additional_dependencies)
        ) and
        not lang.health_check(hook.prefix, hook.language_version)
    )


def _hook_install(hook: Hook) -> None:
    logger.info(f'Installing environment for {hook.src}.')
    logger.info('Once installed this environment will be reused.')
    logger.info('This may take a few minutes...')

    lang = languages[hook.language]
    assert lang.ENVIRONMENT_DIR is not None

    venv = environment_dir(
        hook.prefix,
        lang.ENVIRONMENT_DIR,
        hook.language_version,
    )

    # There's potentially incomplete cleanup from previous runs
    # Clean it up!
    if os.path.exists(venv):
        rmtree(venv)

    with clean_path_on_failure(venv):
        lang.install_environment(
            hook.prefix, hook.language_version, hook.additional_dependencies,
        )
        health_error = lang.health_check(hook.prefix, hook.language_version)
        if health_error:
            raise AssertionError(
                f'BUG: expected environment for {hook.language} to be healthy '
                f'immediately after install, please open an issue describing '
                f'your environment\n\n'
                f'more info:\n\n{health_error}',
            )

        # TODO: remove v1 state writing, no longer needed after pre-commit 3.0
        # Write our state to indicate we're installed
        state_filename = _state_filename_v1(venv)
        staging = f'{state_filename}staging'
        with open(staging, 'w') as state_file:
            state_file.write(json.dumps(_state(hook.additional_dependencies)))
        # Move the file into place atomically to indicate we've installed
        os.replace(staging, state_filename)

        open(_state_filename_v2(venv), 'a+').close()


def _hook(
        *hook_dicts: dict[str, Any],
        root_config: dict[str, Any],
) -> dict[str, Any]:
    ret, rest = dict(hook_dicts[0]), hook_dicts[1:]
    for dct in rest:
        ret.update(dct)

    lang = ret['language']
    if ret['language_version'] == C.DEFAULT:
        ret['language_version'] = root_config['default_language_version'][lang]
    if ret['language_version'] == C.DEFAULT:
        ret['language_version'] = languages[lang].get_default_version()

    if not ret['stages']:
        ret['stages'] = root_config['default_stages']

    if languages[lang].ENVIRONMENT_DIR is None:
        if ret['language_version'] != C.DEFAULT:
            logger.error(
                f'The hook `{ret["id"]}` specifies `language_version` but is '
                f'using language `{lang}` which does not install an '
                f'environment.  '
                f'Perhaps you meant to use a specific language?',
            )
            exit(1)
        if ret['additional_dependencies']:
            logger.error(
                f'The hook `{ret["id"]}` specifies `additional_dependencies` '
                f'but is using language `{lang}` which does not install an '
                f'environment.  '
                f'Perhaps you meant to use a specific language?',
            )
            exit(1)

    return ret


def _non_cloned_repository_hooks(
        repo_config: dict[str, Any],
        store: Store,
        root_config: dict[str, Any],
) -> tuple[Hook, ...]:
    def _prefix(language_name: str, deps: Sequence[str]) -> Prefix:
        language = languages[language_name]
        # pygrep / script / system / docker_image do not have
        # environments so they work out of the current directory
        if language.ENVIRONMENT_DIR is None:
            return Prefix(os.getcwd())
        else:
            return Prefix(store.make_local(deps))

    return tuple(
        Hook.create(
            repo_config['repo'],
            _prefix(hook['language'], hook['additional_dependencies']),
            _hook(hook, root_config=root_config),
        )
        for hook in repo_config['hooks']
    )


def _cloned_repository_hooks(
        repo_config: dict[str, Any],
        store: Store,
        root_config: dict[str, Any],
) -> tuple[Hook, ...]:
    repo, rev = repo_config['repo'], repo_config['rev']
    manifest_path = os.path.join(store.clone(repo, rev), C.MANIFEST_FILE)
    by_id = {hook['id']: hook for hook in load_manifest(manifest_path)}

    for hook in repo_config['hooks']:
        if hook['id'] not in by_id:
            logger.error(
                f'`{hook["id"]}` is not present in repository {repo}.  '
                f'Typo? Perhaps it is introduced in a newer version?  '
                f'Often `pre-commit autoupdate` fixes this.',
            )
            exit(1)

    hook_dcts = [
        _hook(by_id[hook['id']], hook, root_config=root_config)
        for hook in repo_config['hooks']
    ]
    return tuple(
        Hook.create(
            repo_config['repo'],
            Prefix(store.clone(repo, rev, hook['additional_dependencies'])),
            hook,
        )
        for hook in hook_dcts
    )


def _repository_hooks(
        repo_config: dict[str, Any],
        store: Store,
        root_config: dict[str, Any],
) -> tuple[Hook, ...]:
    if repo_config['repo'] in {LOCAL, META}:
        return _non_cloned_repository_hooks(repo_config, store, root_config)
    else:
        return _cloned_repository_hooks(repo_config, store, root_config)


def install_hook_envs(hooks: Sequence[Hook], store: Store) -> None:
    def _need_installed() -> list[Hook]:
        seen: set[tuple[Prefix, str, str, tuple[str, ...]]] = set()
        ret = []
        for hook in hooks:
            if hook.install_key not in seen and not _hook_installed(hook):
                ret.append(hook)
            seen.add(hook.install_key)
        return ret

    if not _need_installed():
        return
    with store.exclusive_lock():
        # Another process may have already completed this work
        for hook in _need_installed():
            _hook_install(hook)


def all_hooks(root_config: dict[str, Any], store: Store) -> tuple[Hook, ...]:
    return tuple(
        hook
        for repo in root_config['repos']
        for hook in _repository_hooks(repo, store, root_config)
    )