summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/rules/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/rules/schema.py')
-rw-r--r--src/ansiblelint/rules/schema.py371
1 files changed, 371 insertions, 0 deletions
diff --git a/src/ansiblelint/rules/schema.py b/src/ansiblelint/rules/schema.py
new file mode 100644
index 0000000..32ff2eb
--- /dev/null
+++ b/src/ansiblelint/rules/schema.py
@@ -0,0 +1,371 @@
+"""Rule definition for JSON Schema Validations."""
+from __future__ import annotations
+
+import logging
+import sys
+from typing import TYPE_CHECKING, Any
+
+from ansiblelint.errors import MatchError
+from ansiblelint.file_utils import Lintable
+from ansiblelint.rules import AnsibleLintRule
+from ansiblelint.schemas.__main__ import JSON_SCHEMAS
+from ansiblelint.schemas.main import validate_file_schema
+from ansiblelint.text import has_jinja
+
+if TYPE_CHECKING:
+ from ansiblelint.utils import Task
+
+
+_logger = logging.getLogger(__name__)
+
+
+DESCRIPTION_MD = """ Returned errors will not include exact line numbers, but they will mention
+the schema name being used as a tag, like ``schema[playbook]``,
+``schema[tasks]``.
+
+This rule is not skippable and stops further processing of the file.
+
+If incorrect schema was picked, you might want to either:
+
+* move the file to standard location, so its file is detected correctly.
+* use ``kinds:`` option in linter config to help it pick correct file type.
+"""
+
+pre_checks = {
+ "task": {
+ "with_flattened": {
+ "msg": "with_flattened was moved to with_community.general.flattened in 2.10",
+ "tag": "moves",
+ },
+ "with_filetree": {
+ "msg": "with_filetree was moved to with_community.general.filetree in 2.10",
+ "tag": "moves",
+ },
+ "with_cartesian": {
+ "msg": "with_cartesian was moved to with_community.general.flattened in 2.10",
+ "tag": "moves",
+ },
+ },
+}
+
+
+class ValidateSchemaRule(AnsibleLintRule):
+ """Perform JSON Schema Validation for known lintable kinds."""
+
+ description = DESCRIPTION_MD
+
+ id = "schema"
+ severity = "VERY_HIGH"
+ tags = ["core"]
+ version_added = "v6.1.0"
+ _ids = {
+ "schema[ansible-lint-config]": "",
+ "schema[ansible-navigator-config]": "",
+ "schema[changelog]": "",
+ "schema[execution-environment]": "",
+ "schema[galaxy]": "",
+ "schema[inventory]": "",
+ "schema[meta]": "",
+ "schema[meta-runtime]": "",
+ "schema[molecule]": "",
+ "schema[playbook]": "",
+ "schema[requirements]": "",
+ "schema[role-arg-spec]": "",
+ "schema[rulebook]": "",
+ "schema[tasks]": "",
+ "schema[vars]": "",
+ }
+ _field_checks: dict[str, list[str]] = {}
+
+ @property
+ def field_checks(self) -> dict[str, list[str]]:
+ """Lazy property for returning field checks."""
+ if not self._collection:
+ msg = "Rule was not registered to a RuleCollection."
+ raise RuntimeError(msg)
+ if not self._field_checks:
+ self._field_checks = {
+ "become_method": sorted(
+ self._collection.app.runtime.plugins.become.keys(),
+ ),
+ }
+ return self._field_checks
+
+ def matchplay(self, file: Lintable, data: dict[str, Any]) -> list[MatchError]:
+ """Return matches found for a specific playbook."""
+ results: list[MatchError] = []
+ if not data or file.kind not in ("tasks", "handlers", "playbook"):
+ return results
+ # check at play level
+ results.extend(self._get_field_matches(file=file, data=data))
+ return results
+
+ def _get_field_matches(
+ self,
+ file: Lintable,
+ data: dict[str, Any],
+ ) -> list[MatchError]:
+ """Retrieve all matches related to fields for the given data block."""
+ results = []
+ for key, values in self.field_checks.items():
+ if key in data:
+ plugin_value = data[key]
+ if not has_jinja(plugin_value) and plugin_value not in values:
+ msg = f"'{key}' must be one of the currently available values: {', '.join(values)}"
+ results.append(
+ MatchError(
+ message=msg,
+ lineno=data.get("__line__", 1),
+ lintable=file,
+ rule=ValidateSchemaRule(),
+ details=ValidateSchemaRule.description,
+ tag=f"schema[{file.kind}]",
+ ),
+ )
+ return results
+
+ def matchtask(
+ self,
+ task: Task,
+ file: Lintable | None = None,
+ ) -> bool | str | MatchError | list[MatchError]:
+ results = []
+ if not file:
+ file = Lintable("", kind="tasks")
+ results.extend(self._get_field_matches(file=file, data=task.raw_task))
+ for key in pre_checks["task"]:
+ if key in task.raw_task:
+ msg = pre_checks["task"][key]["msg"]
+ tag = pre_checks["task"][key]["tag"]
+ results.append(
+ MatchError(
+ message=msg,
+ lintable=file,
+ rule=ValidateSchemaRule(),
+ details=ValidateSchemaRule.description,
+ tag=f"schema[{tag}]",
+ ),
+ )
+ return results
+
+ def matchyaml(self, file: Lintable) -> list[MatchError]:
+ """Return JSON validation errors found as a list of MatchError(s)."""
+ result: list[MatchError] = []
+ if file.kind not in JSON_SCHEMAS:
+ return result
+
+ errors = validate_file_schema(file)
+ if errors:
+ if errors[0].startswith("Failed to load YAML file"):
+ _logger.debug(
+ "Ignored failure to load %s for schema validation, as !vault may cause it.",
+ file,
+ )
+ return []
+
+ result.append(
+ MatchError(
+ message=errors[0],
+ lintable=file,
+ rule=ValidateSchemaRule(),
+ details=ValidateSchemaRule.description,
+ tag=f"schema[{file.kind}]",
+ ),
+ )
+
+ if not result:
+ result = super().matchyaml(file)
+ return result
+
+
+# testing code to be loaded only with pytest or when executed the rule file
+if "pytest" in sys.modules:
+ import pytest
+
+ # pylint: disable=ungrouped-imports
+ from ansiblelint.config import options
+ from ansiblelint.rules import RulesCollection
+ from ansiblelint.runner import Runner
+
+ @pytest.mark.parametrize(
+ ("file", "expected_kind", "expected"),
+ (
+ pytest.param(
+ "examples/collection/galaxy.yml",
+ "galaxy",
+ ["'GPL' is not one of"],
+ id="galaxy",
+ ),
+ pytest.param(
+ "examples/roles/invalid_requirements_schema/meta/requirements.yml",
+ "requirements",
+ ["{'foo': 'bar'} is not valid under any of the given schemas"],
+ id="requirements",
+ ),
+ pytest.param(
+ "examples/roles/invalid_meta_schema/meta/main.yml",
+ "meta",
+ ["False is not of type 'string'"],
+ id="meta",
+ ),
+ pytest.param(
+ "examples/playbooks/vars/invalid_vars_schema.yml",
+ "vars",
+ ["'123' does not match any of the regexes"],
+ id="vars",
+ ),
+ pytest.param(
+ "examples/execution-environment.yml",
+ "execution-environment",
+ [],
+ id="execution-environment",
+ ),
+ pytest.param(
+ "examples/ee_broken/execution-environment.yml",
+ "execution-environment",
+ ["{'foo': 'bar'} is not valid under any of the given schemas"],
+ id="execution-environment-broken",
+ ),
+ ("examples/meta/runtime.yml", "meta-runtime", []),
+ pytest.param(
+ "examples/broken_collection_meta_runtime/meta/runtime.yml",
+ "meta-runtime",
+ ["Additional properties are not allowed ('foo' was unexpected)"],
+ id="meta-runtime-broken",
+ ),
+ pytest.param(
+ "examples/inventory/production.yml",
+ "inventory",
+ [],
+ id="inventory",
+ ),
+ pytest.param(
+ "examples/inventory/broken_dev_inventory.yml",
+ "inventory",
+ ["Additional properties are not allowed ('foo' was unexpected)"],
+ id="inventory-broken",
+ ),
+ pytest.param(
+ ".ansible-lint",
+ "ansible-lint-config",
+ [],
+ id="ansible-lint-config",
+ ),
+ pytest.param(
+ "examples/.config/ansible-lint.yml",
+ "ansible-lint-config",
+ [],
+ id="ansible-lint-config2",
+ ),
+ pytest.param(
+ "examples/broken/.ansible-lint",
+ "ansible-lint-config",
+ ["Additional properties are not allowed ('foo' was unexpected)"],
+ id="ansible-lint-config-broken",
+ ),
+ pytest.param(
+ "examples/ansible-navigator.yml",
+ "ansible-navigator-config",
+ [],
+ id="ansible-navigator-config",
+ ),
+ pytest.param(
+ "examples/broken/ansible-navigator.yml",
+ "ansible-navigator-config",
+ ["Additional properties are not allowed ('ansible' was unexpected)"],
+ id="ansible-navigator-config-broken",
+ ),
+ pytest.param(
+ "examples/roles/hello/meta/argument_specs.yml",
+ "role-arg-spec",
+ [],
+ id="role-arg-spec",
+ ),
+ pytest.param(
+ "examples/roles/broken_argument_specs/meta/argument_specs.yml",
+ "role-arg-spec",
+ ["Additional properties are not allowed ('foo' was unexpected)"],
+ id="role-arg-spec-broken",
+ ),
+ pytest.param(
+ "examples/changelogs/changelog.yaml",
+ "changelog",
+ ["Additional properties are not allowed ('foo' was unexpected)"],
+ id="changelog",
+ ),
+ pytest.param(
+ "examples/rulebooks/rulebook-fail.yml",
+ "rulebook",
+ [
+ "Additional properties are not allowed ('that_should_not_be_here' was unexpected)",
+ ],
+ id="rulebook",
+ ),
+ pytest.param(
+ "examples/rulebooks/rulebook-pass.yml",
+ "rulebook",
+ [],
+ id="rulebook2",
+ ),
+ pytest.param(
+ "examples/playbooks/rule-schema-become-method-pass.yml",
+ "playbook",
+ [],
+ id="playbook",
+ ),
+ pytest.param(
+ "examples/playbooks/rule-schema-become-method-fail.yml",
+ "playbook",
+ [
+ "'become_method' must be one of the currently available values",
+ "'become_method' must be one of the currently available values",
+ ],
+ id="playbook2",
+ ),
+ ),
+ )
+ def test_schema(file: str, expected_kind: str, expected: list[str]) -> None:
+ """Validate parsing of ansible output."""
+ lintable = Lintable(file)
+ assert lintable.kind == expected_kind
+
+ rules = RulesCollection(options=options)
+ rules.register(ValidateSchemaRule())
+ results = Runner(lintable, rules=rules).run()
+
+ assert len(results) == len(expected), results
+ for idx, result in enumerate(results):
+ assert result.filename.endswith(file)
+ assert expected[idx] in result.message
+ assert result.tag == f"schema[{expected_kind}]"
+
+ @pytest.mark.parametrize(
+ ("file", "expected_kind", "expected_tag", "count"),
+ (
+ pytest.param(
+ "examples/playbooks/rule-syntax-moves.yml",
+ "playbook",
+ "schema[moves]",
+ 3,
+ id="playbook",
+ ),
+ ),
+ )
+ def test_schema_moves(
+ file: str,
+ expected_kind: str,
+ expected_tag: str,
+ count: int,
+ ) -> None:
+ """Validate ability to detect schema[moves]."""
+ lintable = Lintable(file)
+ assert lintable.kind == expected_kind
+
+ rules = RulesCollection(options=options)
+ rules.register(ValidateSchemaRule())
+ results = Runner(lintable, rules=rules).run()
+
+ assert len(results) == count, results
+ for result in results:
+ assert result.filename.endswith(file)
+ assert result.tag == expected_tag