summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/schemas/main.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/ansiblelint/schemas/main.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/ansiblelint/schemas/main.py b/src/ansiblelint/schemas/main.py
new file mode 100644
index 0000000..5a96ce9
--- /dev/null
+++ b/src/ansiblelint/schemas/main.py
@@ -0,0 +1,141 @@
+"""Module containing cached JSON schemas."""
+from __future__ import annotations
+
+import json
+import logging
+import os
+import time
+import urllib.request
+from collections import defaultdict
+from functools import lru_cache
+from pathlib import Path
+from typing import Any
+from urllib.request import Request
+
+import jsonschema
+import yaml
+from jsonschema.exceptions import ValidationError
+
+from ansiblelint.file_utils import Lintable
+from ansiblelint.loaders import yaml_load_safe
+
+_logger = logging.getLogger(__package__)
+
+
+class SchemaCacheDict(defaultdict): # type: ignore
+ """Caching schema store."""
+
+ def __missing__(self, key: str) -> Any:
+ """Load schema on its first use."""
+ value = get_schema(key)
+ self[key] = value
+ return value
+
+
+_schema_cache = SchemaCacheDict()
+
+
+# Maps kinds to JSON schemas
+# See https://www.schemastore.org/json/
+store_file = Path(f"{__file__}/../__store__.json").resolve()
+with open(store_file, encoding="utf-8") as json_file:
+ JSON_SCHEMAS = json.load(json_file)
+
+
+@lru_cache(maxsize=None)
+def get_schema(kind: str) -> Any:
+ """Return the schema for the given kind."""
+ schema_file = os.path.dirname(__file__) + "/" + kind + ".json"
+ with open(schema_file, encoding="utf-8") as f:
+ return json.load(f)
+
+
+def validate_file_schema(file: Lintable) -> list[str]:
+ """Return list of JSON validation errors found."""
+ if file.kind not in JSON_SCHEMAS:
+ return [f"Unable to find JSON Schema '{file.kind}' for '{file.path}' file."]
+ try:
+ # convert yaml to json (keys are converted to strings)
+ yaml_data = yaml_load_safe(file.content)
+ json_data = json.loads(json.dumps(yaml_data))
+ # file.data = json_data
+ jsonschema.validate(
+ instance=json_data,
+ schema=_schema_cache[file.kind],
+ )
+ except yaml.constructor.ConstructorError as exc:
+ return [f"Failed to load YAML file '{file.path}': {exc.problem}"]
+ except ValidationError as exc:
+ return [exc.message]
+ return []
+
+
+# pylint: disable=too-many-branches
+def refresh_schemas(min_age_seconds: int = 3600 * 24) -> int:
+ """Refresh JSON schemas by downloading latest versions.
+
+ Returns number of changed schemas.
+ """
+ age = int(time.time() - store_file.stat().st_mtime)
+
+ # never check for updated schemas more than once a day
+ if min_age_seconds > age:
+ return 0
+ if not os.access(store_file, os.W_OK): # pragma: no cover
+ _logger.debug(
+ "Skipping schema update due to lack of writing rights on %s", store_file
+ )
+ return -1
+ _logger.debug("Checking for updated schemas...")
+
+ changed = 0
+ for kind, data in JSON_SCHEMAS.items():
+ url = data["url"]
+ if "#" in url:
+ raise RuntimeError(
+ f"Schema URLs cannot contain # due to python-jsonschema limitation: {url}"
+ )
+ path = Path(f"{os.path.relpath(os.path.dirname(__file__))}/{kind}.json")
+ _logger.debug("Refreshing %s schema ...", kind)
+ request = Request(url)
+ etag = data.get("etag", "")
+ if etag:
+ request.add_header("If-None-Match", f'"{data.get("etag")}"')
+ try:
+ with urllib.request.urlopen(request, timeout=10) as response:
+ if response.status == 200:
+ content = response.read().decode("utf-8").rstrip()
+ etag = response.headers["etag"].strip('"')
+ if etag != data.get("etag", ""):
+ JSON_SCHEMAS[kind]["etag"] = etag
+ changed += 1
+ with open(f"{path}", "w", encoding="utf-8") as f_out:
+ _logger.info("Schema %s was updated", kind)
+ f_out.write(content)
+ f_out.write("\n") # prettier/editors
+ f_out.truncate()
+ os.fsync(f_out.fileno())
+ # unload possibly loaded schema
+ if kind in _schema_cache: # pragma: no cover
+ del _schema_cache[kind]
+ except (ConnectionError, OSError) as exc:
+ if (
+ isinstance(exc, urllib.error.HTTPError)
+ and getattr(exc, "code", None) == 304
+ ):
+ _logger.debug("Schema %s is not modified", url)
+ continue
+ # In case of networking issues, we just stop and use last-known good
+ _logger.debug("Skipped schema refresh due to unexpected exception: %s", exc)
+ break
+ if changed: # pragma: no cover
+ with open(store_file, "w", encoding="utf-8") as f_out:
+ # formatting should match our .prettierrc.yaml
+ json.dump(JSON_SCHEMAS, f_out, indent=2, sort_keys=True)
+ f_out.write("\n") # prettier and editors in general
+ # clear schema cache
+ get_schema.cache_clear()
+ else:
+ store_file.touch()
+ changed = 1
+ return changed