summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/yaml_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/yaml_utils.py')
-rw-r--r--src/ansiblelint/yaml_utils.py295
1 files changed, 214 insertions, 81 deletions
diff --git a/src/ansiblelint/yaml_utils.py b/src/ansiblelint/yaml_utils.py
index cc7e9ef..a1b963d 100644
--- a/src/ansiblelint/yaml_utils.py
+++ b/src/ansiblelint/yaml_utils.py
@@ -1,4 +1,5 @@
"""Utility helpers to simplify working with yaml-based data."""
+
# pylint: disable=too-many-lines
from __future__ import annotations
@@ -6,21 +7,23 @@ import functools
import logging
import os
import re
-from collections.abc import Iterator, Sequence
+from collections.abc import Callable, Iterator, Sequence
from io import StringIO
from pathlib import Path
from re import Pattern
-from typing import TYPE_CHECKING, Any, Callable, Union, cast
+from typing import TYPE_CHECKING, Any, cast
import ruamel.yaml.events
from ruamel.yaml.comments import CommentedMap, CommentedSeq, Format
+from ruamel.yaml.composer import ComposerError
from ruamel.yaml.constructor import RoundTripConstructor
from ruamel.yaml.emitter import Emitter, ScalarAnalysis
# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled
# To make the type checkers happy, we import from ruamel.yaml.main instead.
from ruamel.yaml.main import YAML
-from ruamel.yaml.scalarint import ScalarInt
+from ruamel.yaml.parser import ParserError
+from ruamel.yaml.scalarint import HexInt, ScalarInt
from yamllint.config import YamlLintConfig
from ansiblelint.constants import (
@@ -32,7 +35,8 @@ from ansiblelint.utils import Task
if TYPE_CHECKING:
# noinspection PyProtectedMember
- from ruamel.yaml.comments import LineCol # pylint: disable=ungrouped-imports
+ from ruamel.yaml.comments import LineCol
+ from ruamel.yaml.compat import StreamTextType
from ruamel.yaml.nodes import ScalarNode
from ruamel.yaml.representer import RoundTripRepresenter
from ruamel.yaml.tokens import CommentToken
@@ -41,28 +45,18 @@ if TYPE_CHECKING:
_logger = logging.getLogger(__name__)
-YAMLLINT_CONFIG = """
-extends: default
-rules:
- comments:
- # https://github.com/prettier/prettier/issues/6780
- min-spaces-from-content: 1
- # https://github.com/adrienverge/yamllint/issues/384
- comments-indentation: false
- document-start: disable
- # 160 chars was the default used by old E204 rule, but
- # you can easily change it or disable in your .yamllint file.
- line-length:
- max: 160
- # We are adding an extra space inside braces as that's how prettier does it
- # and we are trying not to fight other linters.
- braces:
- min-spaces-inside: 0 # yamllint defaults to 0
- max-spaces-inside: 1 # yamllint defaults to 0
- octal-values:
- forbid-implicit-octal: true # yamllint defaults to false
- forbid-explicit-octal: true # yamllint defaults to false
-"""
+
+class CustomYamlLintConfig(YamlLintConfig): # type: ignore[misc]
+ """Extension of YamlLintConfig."""
+
+ def __init__(
+ self,
+ content: str | None = None,
+ file: str | Path | None = None,
+ ) -> None:
+ """Initialize config."""
+ super().__init__(content, file)
+ self.incompatible = ""
def deannotate(data: Any) -> Any:
@@ -80,10 +74,10 @@ def deannotate(data: Any) -> Any:
return data
-@functools.lru_cache(maxsize=1)
-def load_yamllint_config() -> YamlLintConfig:
+def load_yamllint_config() -> CustomYamlLintConfig:
"""Load our default yamllint config and any customized override file."""
- config = YamlLintConfig(content=YAMLLINT_CONFIG)
+ config = CustomYamlLintConfig(file=Path(__file__).parent / "data" / ".yamllint")
+ config.incompatible = ""
# if we detect local yamllint config we use it but raise a warning
# as this is likely to get out of sync with our internal config.
for path in [
@@ -100,10 +94,65 @@ def load_yamllint_config() -> YamlLintConfig:
"internal yamllint config.",
file,
)
- config_override = YamlLintConfig(file=str(file))
- config_override.extend(config)
- config = config_override
+ custom_config = CustomYamlLintConfig(file=str(file))
+ custom_config.extend(config)
+ config = custom_config
break
+
+ # Look for settings incompatible with our reformatting
+ checks: list[tuple[str, str | int | bool]] = [
+ (
+ "comments.min-spaces-from-content",
+ 1,
+ ),
+ (
+ "comments-indentation",
+ False,
+ ),
+ (
+ "braces.min-spaces-inside",
+ 0,
+ ),
+ (
+ "braces.max-spaces-inside",
+ 1,
+ ),
+ (
+ "octal-values.forbid-implicit-octal",
+ True,
+ ),
+ (
+ "octal-values.forbid-explicit-octal",
+ True,
+ ),
+ # (
+ # "key-duplicates.forbid-duplicated-merge-keys", # v1.34.0+
+ # True,
+ # ),
+ # (
+ # "quoted-strings.quote-type", "double",
+ # ),
+ # (
+ # "quoted-strings.required", "only-when-needed",
+ # ),
+ ]
+ errors = []
+ for setting, expected_value in checks:
+ v = config.rules
+ for key in setting.split("."):
+ if not isinstance(v, dict): # pragma: no cover
+ break
+ if key not in v: # pragma: no cover
+ break
+ v = v[key]
+ if v != expected_value:
+ msg = f"{setting} must be {str(expected_value).lower()}"
+ errors.append(msg)
+ if errors:
+ nl = "\n"
+ msg = f"Found incompatible custom yamllint configuration ({file}), please either remove the file or edit it to comply with:{nl} - {(nl + ' - ').join(errors)}.{nl}{nl}Read https://ansible.readthedocs.io/projects/lint/rules/yaml/ for more details regarding why we have these requirements. Fix mode will not be available."
+ config.incompatible = msg
+
_logger.debug("Effective yamllint rules used: %s", config.rules)
return config
@@ -196,7 +245,7 @@ def _nested_items_path(
"""
# we have to cast each convert_to_tuples assignment or mypy complains
# that both assignments (for dict and list) do not have the same type
- convert_to_tuples_type = Callable[[], Iterator[tuple[Union[str, int], Any]]]
+ convert_to_tuples_type = Callable[[], Iterator[tuple[str | int, Any]]]
if isinstance(data_collection, dict):
convert_data_collection_to_tuples = cast(
convert_to_tuples_type,
@@ -214,7 +263,7 @@ def _nested_items_path(
if key in (*ANNOTATION_KEYS, *ignored_keys):
continue
yield key, value, parent_path
- if isinstance(value, (dict, list)):
+ if isinstance(value, dict | list):
yield from _nested_items_path(
data_collection=value,
parent_path=[*parent_path, key],
@@ -232,7 +281,7 @@ def get_path_to_play(
raise ValueError(msg)
if lintable.kind != "playbook" or not isinstance(ruamel_data, CommentedSeq):
return []
- lc: LineCol # lc uses 0-based counts # pylint: disable=invalid-name
+ lc: LineCol # lc uses 0-based counts
# lineno is 1-based. Convert to 0-based.
line_index = lineno - 1
@@ -245,10 +294,10 @@ def get_path_to_play(
else:
next_play_line_index = None
- lc = play.lc # pylint: disable=invalid-name
+ lc = play.lc
if not isinstance(lc.line, int):
msg = f"expected lc.line to be an int, got {lc.line!r}"
- raise RuntimeError(msg)
+ raise TypeError(msg)
if lc.line == line_index:
return [play_index]
if play_index > 0 and prev_play_line_index < line_index < lc.line:
@@ -300,6 +349,10 @@ def _get_path_to_task_in_playbook(
else:
next_play_line_index = None
+ # We clearly haven't found the right spot yet if a following play starts on an earlier line.
+ if next_play_line_index and lineno > next_play_line_index:
+ continue
+
play_keys = list(play.keys())
for tasks_keyword in PLAYBOOK_TASK_KEYWORDS:
if not play.get(tasks_keyword):
@@ -381,7 +434,7 @@ def _get_path_to_task_in_tasks_block(
if not isinstance(task.lc.line, int):
msg = f"expected task.lc.line to be an int, got {task.lc.line!r}"
- raise RuntimeError(msg)
+ raise TypeError(msg)
if task.lc.line == line_index:
return [task_index]
if task_index > 0 and prev_task_line_index < line_index < task.lc.line:
@@ -418,6 +471,8 @@ def _get_path_to_task_in_nested_tasks_block(
continue
next_task_key = task_keys_by_index.get(task_index + 1, None)
if next_task_key is not None:
+ if task.lc.data[next_task_key][2] < lineno:
+ continue
next_task_key_line_index = task.lc.data[next_task_key][0]
else:
next_task_key_line_index = None
@@ -461,7 +516,6 @@ class OctalIntYAML11(ScalarInt):
v = format(data, "o")
anchor = data.yaml_anchor(any=True)
# noinspection PyProtectedMember
- # pylint: disable=protected-access
return representer.insert_underscore(
"0",
v,
@@ -498,7 +552,9 @@ class CustomConstructor(RoundTripConstructor):
value_s = value_su.replace("_", "")
if value_s[0] in "+-":
value_s = value_s[1:]
- if value_s[0] == "0":
+ if value_s[0:2] == "0x":
+ ret = HexInt(ret, width=len(value_s) - 2)
+ elif value_s[0] == "0":
# got an octal in YAML 1.1
ret = OctalIntYAML11(
ret,
@@ -582,15 +638,33 @@ class FormattedEmitter(Emitter):
"""Select how to quote scalars if needed."""
style = super().choose_scalar_style()
if (
- style == "" # noqa: PLC1901
+ style == ""
and self.event.value.startswith("0")
and len(self.event.value) > 1
):
- if self.event.tag == "tag:yaml.org,2002:int" and self.event.implicit[0]:
- # ensures that "0123" string does not lose its quoting
+ # We have an as-yet unquoted token that starts with "0" (but is not itself the digit 0).
+ # It could be:
+ # - hexadecimal like "0xF1"; comes tagged as int. Should continue unquoted to continue as an int.
+ # - octal like "0666" or "0o755"; comes tagged as str. **Should** be quoted to be cross-YAML compatible.
+ # - string like "0.0.0.0" and "00-header". Should not be quoted, unless it has a quote in it.
+ if (
+ self.event.value.startswith("0x")
+ and self.event.tag == "tag:yaml.org,2002:int"
+ and self.event.implicit[0]
+ ):
+ # hexadecimal
+ self.event.tag = "tag:yaml.org,2002:str"
+ return ""
+ try:
+ int(self.event.value, 8)
+ except ValueError:
+ pass
+ # fallthrough to string
+ else:
+ # octal
self.event.tag = "tag:yaml.org,2002:str"
self.event.implicit = (True, True, True)
- return '"'
+ return '"'
if style != "'":
# block scalar, double quoted, etc.
return style
@@ -598,6 +672,17 @@ class FormattedEmitter(Emitter):
return "'"
return self.preferred_quote
+ def increase_indent(
+ self,
+ flow: bool = False, # noqa: FBT002
+ sequence: bool | None = None,
+ indentless: bool = False, # noqa: FBT002
+ ) -> None:
+ super().increase_indent(flow, sequence, indentless)
+ # If our previous node was a sequence and we are still trying to indent, don't
+ if self.indents.last_seq():
+ self.indent = self.column + 1
+
def write_indicator(
self,
indicator: str, # ruamel.yaml typehint is wrong. This is a string.
@@ -620,6 +705,9 @@ class FormattedEmitter(Emitter):
and not self._in_empty_flow_map
):
indicator = (" " * spaces_inside) + "}"
+ # Indicator sometimes comes with embedded spaces we need to squish
+ if indicator == " -" and self.indents.last_seq():
+ indicator = "-"
super().write_indicator(indicator, need_whitespace, whitespace, indention)
# if it is the start of a flow mapping, and it's not time
# to wrap the lines, insert a space.
@@ -691,16 +779,21 @@ class FormattedEmitter(Emitter):
and not value.strip()
and not isinstance(
self.event,
- (
- ruamel.yaml.events.CollectionEndEvent,
- ruamel.yaml.events.DocumentEndEvent,
- ruamel.yaml.events.StreamEndEvent,
- ),
+ ruamel.yaml.events.CollectionEndEvent
+ | ruamel.yaml.events.DocumentEndEvent
+ | ruamel.yaml.events.StreamEndEvent
+ | ruamel.yaml.events.MappingStartEvent,
)
):
# drop pure whitespace pre comments
# does not apply to End events since they consume one of the newlines.
value = ""
+ elif (
+ pre
+ and not value.strip()
+ and isinstance(self.event, ruamel.yaml.events.MappingStartEvent)
+ ):
+ value = self._re_repeat_blank_lines.sub("", value)
elif pre:
# preserve content in pre comment with at least one newline,
# but no extra blank lines.
@@ -727,13 +820,25 @@ class FormattedEmitter(Emitter):
class FormattedYAML(YAML):
"""A YAML loader/dumper that handles ansible content better by default."""
- def __init__(
+ default_config = {
+ "explicit_start": True,
+ "explicit_end": False,
+ "width": 160,
+ "indent_sequences": True,
+ "preferred_quote": '"',
+ "min_spaces_inside": 0,
+ "max_spaces_inside": 1,
+ }
+
+ def __init__( # pylint: disable=too-many-arguments
self,
*,
typ: str | None = None,
pure: bool = False,
output: Any = None,
plug_ins: list[str] | None = None,
+ version: tuple[int, int] | None = None,
+ config: dict[str, bool | int | str] | None = None,
):
"""Return a configured ``ruamel.yaml.YAML`` instance.
@@ -793,15 +898,18 @@ class FormattedYAML(YAML):
tasks:
- name: Task
"""
- # Default to reading/dumping YAML 1.1 (ruamel.yaml defaults to 1.2)
- self._yaml_version_default: tuple[int, int] = (1, 1)
- self._yaml_version: str | tuple[int, int] = self._yaml_version_default
-
+ if version:
+ if isinstance(version, str):
+ x, y = version.split(".", maxsplit=1)
+ version = (int(x), int(y))
+ self._yaml_version_default: tuple[int, int] = version
+ self._yaml_version: tuple[int, int] = self._yaml_version_default
super().__init__(typ=typ, pure=pure, output=output, plug_ins=plug_ins)
# NB: We ignore some mypy issues because ruamel.yaml typehints are not great.
- config = self._defaults_from_yamllint_config()
+ if not config:
+ config = self._defaults_from_yamllint_config()
# these settings are derived from yamllint config
self.explicit_start: bool = config["explicit_start"] # type: ignore[assignment]
@@ -854,15 +962,8 @@ class FormattedYAML(YAML):
@staticmethod
def _defaults_from_yamllint_config() -> dict[str, bool | int | str]:
"""Extract FormattedYAML-relevant settings from yamllint config if possible."""
- config = {
- "explicit_start": True,
- "explicit_end": False,
- "width": 160,
- "indent_sequences": True,
- "preferred_quote": '"',
- "min_spaces_inside": 0,
- "max_spaces_inside": 1,
- }
+ config = FormattedYAML.default_config
+
for rule, rule_config in load_yamllint_config().rules.items():
if not rule_config:
# rule disabled
@@ -895,10 +996,10 @@ class FormattedYAML(YAML):
elif quote_type == "double":
config["preferred_quote"] = '"'
- return cast(dict[str, Union[bool, int, str]], config)
+ return cast(dict[str, bool | int | str], config)
- @property # type: ignore[override]
- def version(self) -> str | tuple[int, int]:
+ @property
+ def version(self) -> tuple[int, int] | None:
"""Return the YAML version used to parse or dump.
Ansible uses PyYAML which only supports YAML 1.1. ruamel.yaml defaults to 1.2.
@@ -906,19 +1007,25 @@ class FormattedYAML(YAML):
We can relax the version requirement once ansible uses a version of PyYAML
that includes this PR: https://github.com/yaml/pyyaml/pull/555
"""
- return self._yaml_version
+ if hasattr(self, "_yaml_version"):
+ return self._yaml_version
+ return None
@version.setter
- def version(self, value: str | tuple[int, int] | None) -> None:
+ def version(self, value: tuple[int, int] | None) -> None:
"""Ensure that yaml version uses our default value.
The yaml Reader updates this value based on the ``%YAML`` directive in files.
So, if a file does not include the directive, it sets this to None.
But, None effectively resets the parsing version to YAML 1.2 (ruamel's default).
"""
- self._yaml_version = value if value is not None else self._yaml_version_default
+ if value is not None:
+ self._yaml_version = value
+ elif hasattr(self, "_yaml_version_default"):
+ self._yaml_version = self._yaml_version_default
+ # We do nothing if the object did not have a previous default version defined
- def loads(self, stream: str) -> Any:
+ def load(self, stream: Path | StreamTextType) -> Any:
"""Load YAML content from a string while avoiding known ruamel.yaml issues."""
if not isinstance(stream, str):
msg = f"expected a str but got {type(stream)}"
@@ -928,10 +1035,18 @@ class FormattedYAML(YAML):
# https://sourceforge.net/p/ruamel-yaml/tickets/460/
text, preamble_comment = self._pre_process_yaml(stream)
- data = self.load(stream=text)
+ try:
+ data = super().load(stream=text)
+ except ComposerError:
+ data = self.load_all(stream=text)
+ except ParserError:
+ data = None
+ _logger.error( # noqa: TRY400
+ "Invalid yaml, verify the file contents and try again.",
+ )
if preamble_comment is not None and isinstance(
data,
- (CommentedMap, CommentedSeq),
+ CommentedMap | CommentedSeq,
):
data.preamble_comment = preamble_comment # type: ignore[union-attr]
# Because data can validly also be None for empty documents, we cannot
@@ -948,15 +1063,20 @@ class FormattedYAML(YAML):
stream.write(preamble_comment)
self.dump(data, stream)
text = stream.getvalue()
- return self._post_process_yaml(text)
+ strip_version_directive = hasattr(self, "_yaml_version_default")
+ return self._post_process_yaml(
+ text,
+ strip_version_directive=strip_version_directive,
+ strip_explicit_start=not self.explicit_start,
+ )
def _prevent_wrapping_flow_style(self, data: Any) -> None:
- if not isinstance(data, (CommentedMap, CommentedSeq)):
+ if not isinstance(data, CommentedMap | CommentedSeq):
return
for key, value, parent_path in nested_items_path(data):
- if not isinstance(value, (CommentedMap, CommentedSeq)):
+ if not isinstance(value, CommentedMap | CommentedSeq):
continue
- fa: Format = value.fa # pylint: disable=invalid-name
+ fa: Format = value.fa
if fa.flow_style():
predicted_indent = self._predict_indent_length(parent_path, key)
predicted_width = len(str(value))
@@ -1036,7 +1156,12 @@ class FormattedYAML(YAML):
return text, "".join(preamble_comments) or None
@staticmethod
- def _post_process_yaml(text: str) -> str:
+ def _post_process_yaml(
+ text: str,
+ *,
+ strip_version_directive: bool = False,
+ strip_explicit_start: bool = False,
+ ) -> str:
"""Handle known issues with ruamel.yaml dumping.
Make sure there's only one newline at the end of the file.
@@ -1048,6 +1173,14 @@ class FormattedYAML(YAML):
Make sure null list items don't end in a space.
"""
+ # remove YAML directive
+ if strip_version_directive and text.startswith("%YAML"):
+ text = text.split("\n", 1)[1]
+
+ # remove explicit document start
+ if strip_explicit_start and text.startswith("---"):
+ text = text.split("\n", 1)[1]
+
text = text.rstrip("\n") + "\n"
lines = text.splitlines(keepends=True)
@@ -1092,9 +1225,9 @@ class FormattedYAML(YAML):
def clean_json(
obj: Any,
- func: Callable[[str], Any] = lambda key: key.startswith("__")
- if isinstance(key, str)
- else False,
+ func: Callable[[str], Any] = lambda key: (
+ key.startswith("__") if isinstance(key, str) else False
+ ),
) -> Any:
"""Remove all keys matching the condition from a nested JSON-like object.