summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/schemas/main.py
blob: 5a96ce91d44e228d9f96c51b2c26433738b30c73 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Module containing cached JSON schemas."""
from __future__ import annotations

import json
import logging
import os
import time
import urllib.request
from collections import defaultdict
from functools import lru_cache
from pathlib import Path
from typing import Any
from urllib.request import Request

import jsonschema
import yaml
from jsonschema.exceptions import ValidationError

from ansiblelint.file_utils import Lintable
from ansiblelint.loaders import yaml_load_safe

_logger = logging.getLogger(__package__)


class SchemaCacheDict(defaultdict):  # type: ignore
    """Caching schema store."""

    def __missing__(self, key: str) -> Any:
        """Load schema on its first use."""
        value = get_schema(key)
        self[key] = value
        return value


_schema_cache = SchemaCacheDict()


# Maps kinds to JSON schemas
# See https://www.schemastore.org/json/
store_file = Path(f"{__file__}/../__store__.json").resolve()
with open(store_file, encoding="utf-8") as json_file:
    JSON_SCHEMAS = json.load(json_file)


@lru_cache(maxsize=None)
def get_schema(kind: str) -> Any:
    """Return the schema for the given kind."""
    schema_file = os.path.dirname(__file__) + "/" + kind + ".json"
    with open(schema_file, encoding="utf-8") as f:
        return json.load(f)


def validate_file_schema(file: Lintable) -> list[str]:
    """Return list of JSON validation errors found."""
    if file.kind not in JSON_SCHEMAS:
        return [f"Unable to find JSON Schema '{file.kind}' for '{file.path}' file."]
    try:
        # convert yaml to json (keys are converted to strings)
        yaml_data = yaml_load_safe(file.content)
        json_data = json.loads(json.dumps(yaml_data))
        # file.data = json_data
        jsonschema.validate(
            instance=json_data,
            schema=_schema_cache[file.kind],
        )
    except yaml.constructor.ConstructorError as exc:
        return [f"Failed to load YAML file '{file.path}': {exc.problem}"]
    except ValidationError as exc:
        return [exc.message]
    return []


# pylint: disable=too-many-branches
def refresh_schemas(min_age_seconds: int = 3600 * 24) -> int:
    """Refresh JSON schemas by downloading latest versions.

    Returns number of changed schemas.
    """
    age = int(time.time() - store_file.stat().st_mtime)

    # never check for updated schemas more than once a day
    if min_age_seconds > age:
        return 0
    if not os.access(store_file, os.W_OK):  # pragma: no cover
        _logger.debug(
            "Skipping schema update due to lack of writing rights on %s", store_file
        )
        return -1
    _logger.debug("Checking for updated schemas...")

    changed = 0
    for kind, data in JSON_SCHEMAS.items():
        url = data["url"]
        if "#" in url:
            raise RuntimeError(
                f"Schema URLs cannot contain # due to python-jsonschema limitation: {url}"
            )
        path = Path(f"{os.path.relpath(os.path.dirname(__file__))}/{kind}.json")
        _logger.debug("Refreshing %s schema ...", kind)
        request = Request(url)
        etag = data.get("etag", "")
        if etag:
            request.add_header("If-None-Match", f'"{data.get("etag")}"')
        try:
            with urllib.request.urlopen(request, timeout=10) as response:
                if response.status == 200:
                    content = response.read().decode("utf-8").rstrip()
                    etag = response.headers["etag"].strip('"')
                    if etag != data.get("etag", ""):
                        JSON_SCHEMAS[kind]["etag"] = etag
                        changed += 1
                    with open(f"{path}", "w", encoding="utf-8") as f_out:
                        _logger.info("Schema %s was updated", kind)
                        f_out.write(content)
                        f_out.write("\n")  # prettier/editors
                        f_out.truncate()
                        os.fsync(f_out.fileno())
                        # unload possibly loaded schema
                        if kind in _schema_cache:  # pragma: no cover
                            del _schema_cache[kind]
        except (ConnectionError, OSError) as exc:
            if (
                isinstance(exc, urllib.error.HTTPError)
                and getattr(exc, "code", None) == 304
            ):
                _logger.debug("Schema %s is not modified", url)
                continue
            # In case of networking issues, we just stop and use last-known good
            _logger.debug("Skipped schema refresh due to unexpected exception: %s", exc)
            break
    if changed:  # pragma: no cover
        with open(store_file, "w", encoding="utf-8") as f_out:
            # formatting should match our .prettierrc.yaml
            json.dump(JSON_SCHEMAS, f_out, indent=2, sort_keys=True)
            f_out.write("\n")  # prettier and editors in general
        # clear schema cache
        get_schema.cache_clear()
    else:
        store_file.touch()
        changed = 1
    return changed