1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
"""Module containing cached JSON schemas."""
from __future__ import annotations
import json
import logging
import os
import time
import urllib.request
from collections import defaultdict
from functools import lru_cache
from pathlib import Path
from typing import Any
from urllib.request import Request
import jsonschema
import yaml
from jsonschema.exceptions import ValidationError
from ansiblelint.file_utils import Lintable
from ansiblelint.loaders import yaml_load_safe
_logger = logging.getLogger(__package__)
class SchemaCacheDict(defaultdict): # type: ignore
"""Caching schema store."""
def __missing__(self, key: str) -> Any:
"""Load schema on its first use."""
value = get_schema(key)
self[key] = value
return value
_schema_cache = SchemaCacheDict()
# Maps kinds to JSON schemas
# See https://www.schemastore.org/json/
store_file = Path(f"{__file__}/../__store__.json").resolve()
with open(store_file, encoding="utf-8") as json_file:
JSON_SCHEMAS = json.load(json_file)
@lru_cache(maxsize=None)
def get_schema(kind: str) -> Any:
"""Return the schema for the given kind."""
schema_file = os.path.dirname(__file__) + "/" + kind + ".json"
with open(schema_file, encoding="utf-8") as f:
return json.load(f)
def validate_file_schema(file: Lintable) -> list[str]:
"""Return list of JSON validation errors found."""
if file.kind not in JSON_SCHEMAS:
return [f"Unable to find JSON Schema '{file.kind}' for '{file.path}' file."]
try:
# convert yaml to json (keys are converted to strings)
yaml_data = yaml_load_safe(file.content)
json_data = json.loads(json.dumps(yaml_data))
# file.data = json_data
jsonschema.validate(
instance=json_data,
schema=_schema_cache[file.kind],
)
except yaml.constructor.ConstructorError as exc:
return [f"Failed to load YAML file '{file.path}': {exc.problem}"]
except ValidationError as exc:
return [exc.message]
return []
# pylint: disable=too-many-branches
def refresh_schemas(min_age_seconds: int = 3600 * 24) -> int:
"""Refresh JSON schemas by downloading latest versions.
Returns number of changed schemas.
"""
age = int(time.time() - store_file.stat().st_mtime)
# never check for updated schemas more than once a day
if min_age_seconds > age:
return 0
if not os.access(store_file, os.W_OK): # pragma: no cover
_logger.debug(
"Skipping schema update due to lack of writing rights on %s", store_file
)
return -1
_logger.debug("Checking for updated schemas...")
changed = 0
for kind, data in JSON_SCHEMAS.items():
url = data["url"]
if "#" in url:
raise RuntimeError(
f"Schema URLs cannot contain # due to python-jsonschema limitation: {url}"
)
path = Path(f"{os.path.relpath(os.path.dirname(__file__))}/{kind}.json")
_logger.debug("Refreshing %s schema ...", kind)
request = Request(url)
etag = data.get("etag", "")
if etag:
request.add_header("If-None-Match", f'"{data.get("etag")}"')
try:
with urllib.request.urlopen(request, timeout=10) as response:
if response.status == 200:
content = response.read().decode("utf-8").rstrip()
etag = response.headers["etag"].strip('"')
if etag != data.get("etag", ""):
JSON_SCHEMAS[kind]["etag"] = etag
changed += 1
with open(f"{path}", "w", encoding="utf-8") as f_out:
_logger.info("Schema %s was updated", kind)
f_out.write(content)
f_out.write("\n") # prettier/editors
f_out.truncate()
os.fsync(f_out.fileno())
# unload possibly loaded schema
if kind in _schema_cache: # pragma: no cover
del _schema_cache[kind]
except (ConnectionError, OSError) as exc:
if (
isinstance(exc, urllib.error.HTTPError)
and getattr(exc, "code", None) == 304
):
_logger.debug("Schema %s is not modified", url)
continue
# In case of networking issues, we just stop and use last-known good
_logger.debug("Skipped schema refresh due to unexpected exception: %s", exc)
break
if changed: # pragma: no cover
with open(store_file, "w", encoding="utf-8") as f_out:
# formatting should match our .prettierrc.yaml
json.dump(JSON_SCHEMAS, f_out, indent=2, sort_keys=True)
f_out.write("\n") # prettier and editors in general
# clear schema cache
get_schema.cache_clear()
else:
store_file.touch()
changed = 1
return changed
|