summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/yaml_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/yaml_utils.py')
-rw-r--r--src/ansiblelint/yaml_utils.py1119
1 files changed, 1119 insertions, 0 deletions
diff --git a/src/ansiblelint/yaml_utils.py b/src/ansiblelint/yaml_utils.py
new file mode 100644
index 0000000..cc7e9ef
--- /dev/null
+++ b/src/ansiblelint/yaml_utils.py
@@ -0,0 +1,1119 @@
+"""Utility helpers to simplify working with yaml-based data."""
+# pylint: disable=too-many-lines
+from __future__ import annotations
+
+import functools
+import logging
+import os
+import re
+from collections.abc import Iterator, Sequence
+from io import StringIO
+from pathlib import Path
+from re import Pattern
+from typing import TYPE_CHECKING, Any, Callable, Union, cast
+
+import ruamel.yaml.events
+from ruamel.yaml.comments import CommentedMap, CommentedSeq, Format
+from ruamel.yaml.constructor import RoundTripConstructor
+from ruamel.yaml.emitter import Emitter, ScalarAnalysis
+
+# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled
+# To make the type checkers happy, we import from ruamel.yaml.main instead.
+from ruamel.yaml.main import YAML
+from ruamel.yaml.scalarint import ScalarInt
+from yamllint.config import YamlLintConfig
+
+from ansiblelint.constants import (
+ ANNOTATION_KEYS,
+ NESTED_TASK_KEYS,
+ PLAYBOOK_TASK_KEYWORDS,
+)
+from ansiblelint.utils import Task
+
+if TYPE_CHECKING:
+ # noinspection PyProtectedMember
+ from ruamel.yaml.comments import LineCol # pylint: disable=ungrouped-imports
+ from ruamel.yaml.nodes import ScalarNode
+ from ruamel.yaml.representer import RoundTripRepresenter
+ from ruamel.yaml.tokens import CommentToken
+
+ from ansiblelint.file_utils import Lintable
+
+_logger = logging.getLogger(__name__)
+
+YAMLLINT_CONFIG = """
+extends: default
+rules:
+ comments:
+ # https://github.com/prettier/prettier/issues/6780
+ min-spaces-from-content: 1
+ # https://github.com/adrienverge/yamllint/issues/384
+ comments-indentation: false
+ document-start: disable
+ # 160 chars was the default used by old E204 rule, but
+ # you can easily change it or disable in your .yamllint file.
+ line-length:
+ max: 160
+ # We are adding an extra space inside braces as that's how prettier does it
+ # and we are trying not to fight other linters.
+ braces:
+ min-spaces-inside: 0 # yamllint defaults to 0
+ max-spaces-inside: 1 # yamllint defaults to 0
+ octal-values:
+ forbid-implicit-octal: true # yamllint defaults to false
+ forbid-explicit-octal: true # yamllint defaults to false
+"""
+
+
+def deannotate(data: Any) -> Any:
+ """Remove our annotations like __file__ and __line__ and return a JSON serializable object."""
+ if isinstance(data, dict):
+ result = data.copy()
+ for key, value in data.items():
+ if key in ANNOTATION_KEYS:
+ del result[key]
+ else:
+ result[key] = deannotate(value)
+ return result
+ if isinstance(data, list):
+ return [deannotate(item) for item in data if item not in ANNOTATION_KEYS]
+ return data
+
+
+@functools.lru_cache(maxsize=1)
+def load_yamllint_config() -> YamlLintConfig:
+ """Load our default yamllint config and any customized override file."""
+ config = YamlLintConfig(content=YAMLLINT_CONFIG)
+ # if we detect local yamllint config we use it but raise a warning
+ # as this is likely to get out of sync with our internal config.
+ for path in [
+ ".yamllint",
+ ".yamllint.yaml",
+ ".yamllint.yml",
+ os.getenv("YAMLLINT_CONFIG_FILE", ""),
+ os.getenv("XDG_CONFIG_HOME", "~/.config") + "/yamllint/config",
+ ]:
+ file = Path(path).expanduser()
+ if file.is_file():
+ _logger.debug(
+ "Loading custom %s config file, this extends our "
+ "internal yamllint config.",
+ file,
+ )
+ config_override = YamlLintConfig(file=str(file))
+ config_override.extend(config)
+ config = config_override
+ break
+ _logger.debug("Effective yamllint rules used: %s", config.rules)
+ return config
+
+
+def nested_items_path(
+ data_collection: dict[Any, Any] | list[Any],
+ ignored_keys: Sequence[str] = (),
+) -> Iterator[tuple[Any, Any, list[str | int]]]:
+ """Iterate a nested data structure, yielding key/index, value, and parent_path.
+
+ This is a recursive function that calls itself for each nested layer of data.
+ Each iteration yields:
+
+ 1. the current item's dictionary key or list index,
+ 2. the current item's value, and
+ 3. the path to the current item from the outermost data structure.
+
+ For dicts, the yielded (1) key and (2) value are what ``dict.items()`` yields.
+ For lists, the yielded (1) index and (2) value are what ``enumerate()`` yields.
+ The final component, the parent path, is a list of dict keys and list indexes.
+ The parent path can be helpful in providing error messages that indicate
+ precisely which part of a yaml file (or other data structure) needs to be fixed.
+
+ For example, given this playbook:
+
+ .. code-block:: yaml
+
+ - name: A play
+ tasks:
+ - name: A task
+ debug:
+ msg: foobar
+
+ Here's the first and last yielded items:
+
+ .. code-block:: python
+
+ >>> playbook=[{"name": "a play", "tasks": [{"name": "a task", "debug": {"msg": "foobar"}}]}]
+ >>> next( nested_items_path( playbook ) )
+ (0, {'name': 'a play', 'tasks': [{'name': 'a task', 'debug': {'msg': 'foobar'}}]}, [])
+ >>> list( nested_items_path( playbook ) )[-1]
+ ('msg', 'foobar', [0, 'tasks', 0, 'debug'])
+
+ Note that, for outermost data structure, the parent path is ``[]`` because
+ you do not need to descend into any nested dicts or lists to find the indicated
+ key and value.
+
+ If a rule were designed to prohibit "foobar" debug messages, it could use the
+ parent path to provide a path to the problematic ``msg``. It might use a jq-style
+ path in its error message: "the error is at ``.[0].tasks[0].debug.msg``".
+ Or if a utility could automatically fix issues, it could use the path to descend
+ to the parent object using something like this:
+
+ .. code-block:: python
+
+ target = data
+ for segment in parent_path:
+ target = target[segment]
+
+ :param data_collection: The nested data (dicts or lists).
+
+ :returns: each iteration yields the key (of the parent dict) or the index (lists)
+ """
+ # As typing and mypy cannot effectively ensure we are called only with
+ # valid data, we better ignore NoneType
+ if data_collection is None:
+ return
+ data: dict[Any, Any] | list[Any]
+ if isinstance(data_collection, Task):
+ data = data_collection.normalized_task
+ else:
+ data = data_collection
+ yield from _nested_items_path(
+ data_collection=data,
+ parent_path=[],
+ ignored_keys=ignored_keys,
+ )
+
+
+def _nested_items_path(
+ data_collection: dict[Any, Any] | list[Any],
+ parent_path: list[str | int],
+ ignored_keys: Sequence[str] = (),
+) -> Iterator[tuple[Any, Any, list[str | int]]]:
+ """Iterate through data_collection (internal implementation of nested_items_path).
+
+ This is a separate function because callers of nested_items_path should
+ not be using the parent_path param which is used in recursive _nested_items_path
+ calls to build up the path to the parent object of the current key/index, value.
+ """
+ # we have to cast each convert_to_tuples assignment or mypy complains
+ # that both assignments (for dict and list) do not have the same type
+ convert_to_tuples_type = Callable[[], Iterator[tuple[Union[str, int], Any]]]
+ if isinstance(data_collection, dict):
+ convert_data_collection_to_tuples = cast(
+ convert_to_tuples_type,
+ functools.partial(data_collection.items),
+ )
+ elif isinstance(data_collection, list):
+ convert_data_collection_to_tuples = cast(
+ convert_to_tuples_type,
+ functools.partial(enumerate, data_collection),
+ )
+ else:
+ msg = f"Expected a dict or a list but got {data_collection!r} of type '{type(data_collection)}'"
+ raise TypeError(msg)
+ for key, value in convert_data_collection_to_tuples():
+ if key in (*ANNOTATION_KEYS, *ignored_keys):
+ continue
+ yield key, value, parent_path
+ if isinstance(value, (dict, list)):
+ yield from _nested_items_path(
+ data_collection=value,
+ parent_path=[*parent_path, key],
+ )
+
+
+def get_path_to_play(
+ lintable: Lintable,
+ lineno: int, # 1-based
+ ruamel_data: CommentedMap | CommentedSeq,
+) -> list[str | int]:
+ """Get the path to the play in the given file at the given line number."""
+ if lineno < 1:
+ msg = f"expected lineno >= 1, got {lineno}"
+ raise ValueError(msg)
+ if lintable.kind != "playbook" or not isinstance(ruamel_data, CommentedSeq):
+ return []
+ lc: LineCol # lc uses 0-based counts # pylint: disable=invalid-name
+ # lineno is 1-based. Convert to 0-based.
+ line_index = lineno - 1
+
+ prev_play_line_index = ruamel_data.lc.line
+ last_play_index = len(ruamel_data)
+ for play_index, play in enumerate(ruamel_data):
+ next_play_index = play_index + 1
+ if last_play_index > next_play_index:
+ next_play_line_index = ruamel_data[next_play_index].lc.line
+ else:
+ next_play_line_index = None
+
+ lc = play.lc # pylint: disable=invalid-name
+ if not isinstance(lc.line, int):
+ msg = f"expected lc.line to be an int, got {lc.line!r}"
+ raise RuntimeError(msg)
+ if lc.line == line_index:
+ return [play_index]
+ if play_index > 0 and prev_play_line_index < line_index < lc.line:
+ return [play_index - 1]
+ # The previous play check (above) can't catch the last play,
+ # so, handle the last play separately.
+ if (
+ next_play_index == last_play_index
+ and line_index > lc.line
+ and (next_play_line_index is None or line_index < next_play_line_index)
+ ):
+ # part of this (last) play
+ return [play_index]
+ prev_play_line_index = play.lc.line
+ return []
+
+
+def get_path_to_task(
+ lintable: Lintable,
+ lineno: int, # 1-based
+ ruamel_data: CommentedMap | CommentedSeq,
+) -> list[str | int]:
+ """Get the path to the task in the given file at the given line number."""
+ if lineno < 1:
+ msg = f"expected lineno >= 1, got {lineno}"
+ raise ValueError(msg)
+ if lintable.kind in ("tasks", "handlers", "playbook"):
+ if not isinstance(ruamel_data, CommentedSeq):
+ msg = f"expected ruamel_data to be a CommentedSeq, got {ruamel_data!r}"
+ raise ValueError(msg)
+ if lintable.kind in ("tasks", "handlers"):
+ return _get_path_to_task_in_tasks_block(lineno, ruamel_data)
+ if lintable.kind == "playbook":
+ return _get_path_to_task_in_playbook(lineno, ruamel_data)
+
+ return []
+
+
+def _get_path_to_task_in_playbook(
+ lineno: int, # 1-based
+ ruamel_data: CommentedSeq,
+) -> list[str | int]:
+ """Get the path to the task in the given playbook data at the given line number."""
+ last_play_index = len(ruamel_data)
+ for play_index, play in enumerate(ruamel_data):
+ next_play_index = play_index + 1
+ if last_play_index > next_play_index:
+ next_play_line_index = ruamel_data[next_play_index].lc.line
+ else:
+ next_play_line_index = None
+
+ play_keys = list(play.keys())
+ for tasks_keyword in PLAYBOOK_TASK_KEYWORDS:
+ if not play.get(tasks_keyword):
+ continue
+
+ try:
+ next_keyword = play_keys[play_keys.index(tasks_keyword) + 1]
+ except IndexError:
+ next_block_line_index = None
+ else:
+ next_block_line_index = play.lc.data[next_keyword][0]
+ # last_lineno_in_block is 1-based; next_*_line_index is 0-based
+ # next_*_line_index - 1 to get line before next_*_line_index.
+ # Then + 1 to make it a 1-based number.
+ if next_block_line_index is not None:
+ last_lineno_in_block = next_block_line_index
+ elif next_play_line_index is not None:
+ last_lineno_in_block = next_play_line_index
+ else:
+ last_lineno_in_block = None
+
+ task_path = _get_path_to_task_in_tasks_block(
+ lineno,
+ play[tasks_keyword],
+ last_lineno_in_block,
+ )
+ if task_path:
+ # mypy gets confused without this typehint
+ tasks_keyword_path: list[int | str] = [
+ play_index,
+ tasks_keyword,
+ ]
+ return tasks_keyword_path + list(task_path)
+ # lineno is before first play or no tasks keywords in any of the plays
+ return []
+
+
+def _get_path_to_task_in_tasks_block(
+ lineno: int, # 1-based
+ tasks_block: CommentedSeq,
+ last_lineno: int | None = None, # 1-based
+) -> list[str | int]:
+ """Get the path to the task in the given tasks block at the given line number."""
+ task: CommentedMap | None
+ # lineno and last_lineno are 1-based. Convert to 0-based.
+ line_index = lineno - 1
+ last_line_index = None if last_lineno is None else last_lineno - 1
+
+ # lc (LineCol) uses 0-based counts
+ prev_task_line_index = tasks_block.lc.line
+ last_task_index = len(tasks_block)
+ for task_index, task in enumerate(tasks_block):
+ next_task_index = task_index + 1
+ if last_task_index > next_task_index:
+ if tasks_block[next_task_index] is not None:
+ next_task_line_index = tasks_block[next_task_index].lc.line
+ else:
+ next_task_line_index = tasks_block.lc.item(next_task_index)[0]
+ else:
+ next_task_line_index = None
+
+ if task is None:
+ # create a dummy task to represent the null task
+ task = CommentedMap()
+ task.lc.line, task.lc.col = tasks_block.lc.item(task_index)
+
+ nested_task_keys = set(task.keys()).intersection(set(NESTED_TASK_KEYS))
+ if nested_task_keys:
+ subtask_path = _get_path_to_task_in_nested_tasks_block(
+ lineno,
+ task,
+ nested_task_keys,
+ next_task_line_index,
+ )
+ if subtask_path:
+ # mypy gets confused without this typehint
+ task_path: list[str | int] = [task_index]
+ return task_path + list(subtask_path)
+
+ if not isinstance(task.lc.line, int):
+ msg = f"expected task.lc.line to be an int, got {task.lc.line!r}"
+ raise RuntimeError(msg)
+ if task.lc.line == line_index:
+ return [task_index]
+ if task_index > 0 and prev_task_line_index < line_index < task.lc.line:
+ return [task_index - 1]
+ # The previous task check can't catch the last task,
+ # so, handle the last task separately (also after subtask checks).
+ # pylint: disable=too-many-boolean-expressions
+ if (
+ next_task_index == last_task_index
+ and line_index > task.lc.line
+ and (next_task_line_index is None or line_index < next_task_line_index)
+ and (last_line_index is None or line_index <= last_line_index)
+ ):
+ # part of this (last) task
+ return [task_index]
+ prev_task_line_index = task.lc.line
+ # line is not part of this tasks block
+ return []
+
+
+def _get_path_to_task_in_nested_tasks_block(
+ lineno: int, # 1-based
+ task: CommentedMap,
+ nested_task_keys: set[str],
+ next_task_line_index: int | None = None, # 0-based
+) -> list[str | int]:
+ """Get the path to the task in the given nested tasks block."""
+ # loop through the keys in line order
+ task_keys = list(task.keys())
+ task_keys_by_index = dict(enumerate(task_keys))
+ for task_index, task_key in enumerate(task_keys):
+ nested_task_block = task[task_key]
+ if task_key not in nested_task_keys or not nested_task_block:
+ continue
+ next_task_key = task_keys_by_index.get(task_index + 1, None)
+ if next_task_key is not None:
+ next_task_key_line_index = task.lc.data[next_task_key][0]
+ else:
+ next_task_key_line_index = None
+ # last_lineno_in_block is 1-based; next_*_line_index is 0-based
+ # next_*_line_index - 1 to get line before next_*_line_index.
+ # Then + 1 to make it a 1-based number.
+ last_lineno_in_block = (
+ next_task_key_line_index
+ if next_task_key_line_index is not None
+ else next_task_line_index
+ )
+ subtask_path = _get_path_to_task_in_tasks_block(
+ lineno,
+ nested_task_block,
+ last_lineno_in_block, # 1-based
+ )
+ if subtask_path:
+ return [task_key, *list(subtask_path)]
+ # line is not part of this nested tasks block
+ return []
+
+
+class OctalIntYAML11(ScalarInt):
+ """OctalInt representation for YAML 1.1."""
+
+ # tell mypy that ScalarInt has these attributes
+ _width: Any
+ _underscore: Any
+
+ def __new__(cls, *args: Any, **kwargs: Any) -> Any:
+ """Create a new int with ScalarInt-defined attributes."""
+ return ScalarInt.__new__(cls, *args, **kwargs)
+
+ @staticmethod
+ def represent_octal(representer: RoundTripRepresenter, data: OctalIntYAML11) -> Any:
+ """Return a YAML 1.1 octal representation.
+
+ Based on ruamel.yaml.representer.RoundTripRepresenter.represent_octal_int()
+ (which only handles the YAML 1.2 octal representation).
+ """
+ v = format(data, "o")
+ anchor = data.yaml_anchor(any=True)
+ # noinspection PyProtectedMember
+ # pylint: disable=protected-access
+ return representer.insert_underscore(
+ "0",
+ v,
+ data._underscore, # noqa: SLF001
+ anchor=anchor,
+ )
+
+
+class CustomConstructor(RoundTripConstructor):
+ """Custom YAML constructor that preserves Octal formatting in YAML 1.1."""
+
+ def construct_yaml_int(self, node: ScalarNode) -> Any:
+ """Construct int while preserving Octal formatting in YAML 1.1.
+
+ ruamel.yaml only preserves the octal format for YAML 1.2.
+ For 1.1, it converts the octal to an int. So, we preserve the format.
+
+ Code partially copied from ruamel.yaml (MIT licensed).
+ """
+ ret = super().construct_yaml_int(node)
+ if self.resolver.processing_version == (1, 1) and isinstance(ret, int):
+ # Do not rewrite zero as octal.
+ if ret == 0:
+ return ret
+ # see if we've got an octal we need to preserve.
+ value_su = self.construct_scalar(node)
+ try:
+ v = value_su.rstrip("_")
+ underscore = [len(v) - v.rindex("_") - 1, False, False] # type: Any
+ except ValueError:
+ underscore = None
+ except IndexError:
+ underscore = None
+ value_s = value_su.replace("_", "")
+ if value_s[0] in "+-":
+ value_s = value_s[1:]
+ if value_s[0] == "0":
+ # got an octal in YAML 1.1
+ ret = OctalIntYAML11(
+ ret,
+ width=None,
+ underscore=underscore,
+ anchor=node.anchor,
+ )
+ return ret
+
+
+CustomConstructor.add_constructor(
+ "tag:yaml.org,2002:int",
+ CustomConstructor.construct_yaml_int,
+)
+
+
+class FormattedEmitter(Emitter):
+ """Emitter that applies custom formatting rules when dumping YAML.
+
+ Differences from ruamel.yaml defaults:
+
+ - indentation of root-level sequences
+ - prefer double-quoted scalars over single-quoted scalars
+
+ This ensures that root-level sequences are never indented.
+ All subsequent levels are indented as configured (normal ruamel.yaml behavior).
+
+ Earlier implementations used dedent on ruamel.yaml's dumped output,
+ but string magic like that had a ton of problematic edge cases.
+ """
+
+ preferred_quote = '"' # either " or '
+
+ min_spaces_inside = 0
+ max_spaces_inside = 1
+
+ _sequence_indent = 2
+ _sequence_dash_offset = 0 # Should be _sequence_indent - 2
+ _root_is_sequence = False
+
+ _in_empty_flow_map = False
+
+ @property
+ def _is_root_level_sequence(self) -> bool:
+ """Return True if this is a sequence at the root level of the yaml document."""
+ return self.column < 2 and self._root_is_sequence
+
+ def expect_document_root(self) -> None:
+ """Expect doc root (extend to record if the root doc is a sequence)."""
+ self._root_is_sequence = isinstance(
+ self.event,
+ ruamel.yaml.events.SequenceStartEvent,
+ )
+ return super().expect_document_root()
+
+ # NB: mypy does not support overriding attributes with properties yet:
+ # https://github.com/python/mypy/issues/4125
+ # To silence we have to ignore[override] both the @property and the method.
+
+ @property
+ def best_sequence_indent(self) -> int:
+ """Return the configured sequence_indent or 2 for root level."""
+ return 2 if self._is_root_level_sequence else self._sequence_indent
+
+ @best_sequence_indent.setter
+ def best_sequence_indent(self, value: int) -> None:
+ """Configure how many columns to indent each sequence item (including the '-')."""
+ self._sequence_indent = value
+
+ @property
+ def sequence_dash_offset(self) -> int:
+ """Return the configured sequence_dash_offset or 0 for root level."""
+ return 0 if self._is_root_level_sequence else self._sequence_dash_offset
+
+ @sequence_dash_offset.setter
+ def sequence_dash_offset(self, value: int) -> None:
+ """Configure how many spaces to put before each sequence item's '-'."""
+ self._sequence_dash_offset = value
+
+ def choose_scalar_style(self) -> Any:
+ """Select how to quote scalars if needed."""
+ style = super().choose_scalar_style()
+ if (
+ style == "" # noqa: PLC1901
+ and self.event.value.startswith("0")
+ and len(self.event.value) > 1
+ ):
+ if self.event.tag == "tag:yaml.org,2002:int" and self.event.implicit[0]:
+ # ensures that "0123" string does not lose its quoting
+ self.event.tag = "tag:yaml.org,2002:str"
+ self.event.implicit = (True, True, True)
+ return '"'
+ if style != "'":
+ # block scalar, double quoted, etc.
+ return style
+ if '"' in self.event.value:
+ return "'"
+ return self.preferred_quote
+
+ def write_indicator(
+ self,
+ indicator: str, # ruamel.yaml typehint is wrong. This is a string.
+ need_whitespace: bool,
+ whitespace: bool = False, # noqa: FBT002
+ indention: bool = False, # (sic) ruamel.yaml has this typo in their API # noqa: FBT002
+ ) -> None:
+ """Make sure that flow maps get whitespace by the curly braces."""
+ # We try to go with one whitespace by the curly braces and adjust accordingly
+ # to what min_spaces_inside and max_spaces_inside are set to.
+ # This assumes min_spaces_inside <= max_spaces_inside
+ spaces_inside = min(
+ max(1, self.min_spaces_inside),
+ self.max_spaces_inside if self.max_spaces_inside != -1 else 1,
+ )
+ # If this is the end of the flow mapping that isn't on a new line:
+ if (
+ indicator == "}"
+ and (self.column or 0) > (self.indent or 0)
+ and not self._in_empty_flow_map
+ ):
+ indicator = (" " * spaces_inside) + "}"
+ super().write_indicator(indicator, need_whitespace, whitespace, indention)
+ # if it is the start of a flow mapping, and it's not time
+ # to wrap the lines, insert a space.
+ if indicator == "{" and self.column < self.best_width:
+ if self.check_empty_mapping():
+ self._in_empty_flow_map = True
+ else:
+ self.column += 1
+ self.stream.write(" " * spaces_inside)
+ self._in_empty_flow_map = False
+
+ # "/n/n" results in one blank line (end the previous line, then newline).
+ # So, "/n/n/n" or more is too many new lines. Clean it up.
+ _re_repeat_blank_lines: Pattern[str] = re.compile(r"\n{3,}")
+
+ @staticmethod
+ def add_octothorpe_protection(string: str) -> str:
+ """Modify strings to protect "#" from full-line-comment post-processing."""
+ try:
+ if "#" in string:
+ # # is \uFF03 (fullwidth number sign)
+ # ﹟ is \uFE5F (small number sign)
+ string = string.replace("#", "\uFF03#\uFE5F")
+ # this is safe even if this sequence is present
+ # because it gets reversed in post-processing
+ except (ValueError, TypeError):
+ # probably not really a string. Whatever.
+ pass
+ return string
+
+ @staticmethod
+ def drop_octothorpe_protection(string: str) -> str:
+ """Remove string protection of "#" after full-line-comment post-processing."""
+ try:
+ if "\uFF03#\uFE5F" in string:
+ # # is \uFF03 (fullwidth number sign)
+ # ﹟ is \uFE5F (small number sign)
+ string = string.replace("\uFF03#\uFE5F", "#")
+ except (ValueError, TypeError):
+ # probably not really a string. Whatever.
+ pass
+ return string
+
+ def analyze_scalar(self, scalar: str) -> ScalarAnalysis:
+ """Determine quoting and other requirements for string.
+
+ And protect "#" from full-line-comment post-processing.
+ """
+ analysis: ScalarAnalysis = super().analyze_scalar(scalar)
+ if analysis.empty:
+ return analysis
+ analysis.scalar = self.add_octothorpe_protection(analysis.scalar)
+ return analysis
+
+ # comment is a CommentToken, not Any (Any is ruamel.yaml's lazy type hint).
+ def write_comment(
+ self,
+ comment: CommentToken,
+ pre: bool = False, # noqa: FBT002
+ ) -> None:
+ """Clean up extra new lines and spaces in comments.
+
+ ruamel.yaml treats new or empty lines as comments.
+ See: https://stackoverflow.com/questions/42708668/removing-all-blank-lines-but-not-comments-in-ruamel-yaml/42712747#42712747
+ """
+ value: str = comment.value
+ if (
+ pre
+ and not value.strip()
+ and not isinstance(
+ self.event,
+ (
+ ruamel.yaml.events.CollectionEndEvent,
+ ruamel.yaml.events.DocumentEndEvent,
+ ruamel.yaml.events.StreamEndEvent,
+ ),
+ )
+ ):
+ # drop pure whitespace pre comments
+ # does not apply to End events since they consume one of the newlines.
+ value = ""
+ elif pre:
+ # preserve content in pre comment with at least one newline,
+ # but no extra blank lines.
+ value = self._re_repeat_blank_lines.sub("\n", value)
+ else:
+ # single blank lines in post comments
+ value = self._re_repeat_blank_lines.sub("\n\n", value)
+ comment.value = value
+
+ # make sure that the eol comment only has one space before it.
+ if comment.column > self.column + 1 and not pre:
+ comment.column = self.column + 1
+
+ return super().write_comment(comment, pre)
+
+ def write_version_directive(self, version_text: Any) -> None:
+ """Skip writing '%YAML 1.1'."""
+ if version_text == "1.1":
+ return
+ super().write_version_directive(version_text)
+
+
+# pylint: disable=too-many-instance-attributes
+class FormattedYAML(YAML):
+ """A YAML loader/dumper that handles ansible content better by default."""
+
+ def __init__(
+ self,
+ *,
+ typ: str | None = None,
+ pure: bool = False,
+ output: Any = None,
+ plug_ins: list[str] | None = None,
+ ):
+ """Return a configured ``ruamel.yaml.YAML`` instance.
+
+ Some config defaults get extracted from the yamllint config.
+
+ ``ruamel.yaml.YAML`` uses attributes to configure how it dumps yaml files.
+ Some of these settings can be confusing, so here are examples of how different
+ settings will affect the dumped yaml.
+
+ This example does not indent any sequences:
+
+ .. code:: python
+
+ yaml.explicit_start=True
+ yaml.map_indent=2
+ yaml.sequence_indent=2
+ yaml.sequence_dash_offset=0
+
+ .. code:: yaml
+
+ ---
+ - name: A playbook
+ tasks:
+ - name: Task
+
+ This example indents all sequences including the root-level:
+
+ .. code:: python
+
+ yaml.explicit_start=True
+ yaml.map_indent=2
+ yaml.sequence_indent=4
+ yaml.sequence_dash_offset=2
+ # yaml.Emitter defaults to ruamel.yaml.emitter.Emitter
+
+ .. code:: yaml
+
+ ---
+ - name: Playbook
+ tasks:
+ - name: Task
+
+ This example indents all sequences except at the root-level:
+
+ .. code:: python
+
+ yaml.explicit_start=True
+ yaml.map_indent=2
+ yaml.sequence_indent=4
+ yaml.sequence_dash_offset=2
+ yaml.Emitter = FormattedEmitter # custom Emitter prevents root-level indents
+
+ .. code:: yaml
+
+ ---
+ - name: Playbook
+ tasks:
+ - name: Task
+ """
+ # Default to reading/dumping YAML 1.1 (ruamel.yaml defaults to 1.2)
+ self._yaml_version_default: tuple[int, int] = (1, 1)
+ self._yaml_version: str | tuple[int, int] = self._yaml_version_default
+
+ super().__init__(typ=typ, pure=pure, output=output, plug_ins=plug_ins)
+
+ # NB: We ignore some mypy issues because ruamel.yaml typehints are not great.
+
+ config = self._defaults_from_yamllint_config()
+
+ # these settings are derived from yamllint config
+ self.explicit_start: bool = config["explicit_start"] # type: ignore[assignment]
+ self.explicit_end: bool = config["explicit_end"] # type: ignore[assignment]
+ self.width: int = config["width"] # type: ignore[assignment]
+ indent_sequences: bool = cast(bool, config["indent_sequences"])
+ preferred_quote: str = cast(str, config["preferred_quote"]) # either ' or "
+
+ min_spaces_inside: int = cast(int, config["min_spaces_inside"])
+ max_spaces_inside: int = cast(int, config["max_spaces_inside"])
+
+ self.default_flow_style = False
+ self.compact_seq_seq = True # type: ignore[assignment] # dash after dash
+ self.compact_seq_map = True # type: ignore[assignment] # key after dash
+
+ # Do not use yaml.indent() as it obscures the purpose of these vars:
+ self.map_indent = 2
+ self.sequence_indent = 4 if indent_sequences else 2
+ self.sequence_dash_offset = self.sequence_indent - 2
+
+ # If someone doesn't want our FormattedEmitter, they can change it.
+ self.Emitter = FormattedEmitter
+
+ # ignore invalid preferred_quote setting
+ if preferred_quote in ['"', "'"]:
+ FormattedEmitter.preferred_quote = preferred_quote
+ # NB: default_style affects preferred_quote as well.
+ # self.default_style ∈ None (default), '', '"', "'", '|', '>'
+
+ # spaces inside braces for flow mappings
+ FormattedEmitter.min_spaces_inside = min_spaces_inside
+ FormattedEmitter.max_spaces_inside = max_spaces_inside
+
+ # We need a custom constructor to preserve Octal formatting in YAML 1.1
+ self.Constructor = CustomConstructor
+ self.Representer.add_representer(OctalIntYAML11, OctalIntYAML11.represent_octal)
+
+ # We should preserve_quotes loads all strings as a str subclass that carries
+ # a quote attribute. Will the str subclasses cause problems in transforms?
+ # Are there any other gotchas to this?
+ #
+ # This will only preserve quotes for strings read from the file.
+ # anything modified by the transform will use no quotes, preferred_quote,
+ # or the quote that results in the least amount of escaping.
+
+ # If needed, we can use this to change null representation to be explicit
+ # (see https://stackoverflow.com/a/44314840/1134951)
+ # self.Representer.add_representer(
+
+ @staticmethod
+ def _defaults_from_yamllint_config() -> dict[str, bool | int | str]:
+ """Extract FormattedYAML-relevant settings from yamllint config if possible."""
+ config = {
+ "explicit_start": True,
+ "explicit_end": False,
+ "width": 160,
+ "indent_sequences": True,
+ "preferred_quote": '"',
+ "min_spaces_inside": 0,
+ "max_spaces_inside": 1,
+ }
+ for rule, rule_config in load_yamllint_config().rules.items():
+ if not rule_config:
+ # rule disabled
+ continue
+
+ # refactor this if ... elif ... elif ... else monstrosity using match/case (PEP 634) once python 3.10 is mandatory
+ if rule == "document-start":
+ config["explicit_start"] = rule_config["present"]
+ elif rule == "document-end":
+ config["explicit_end"] = rule_config["present"]
+ elif rule == "line-length":
+ config["width"] = rule_config["max"]
+ elif rule == "braces":
+ min_spaces_inside = rule_config["min-spaces-inside"]
+ if min_spaces_inside:
+ config["min_spaces_inside"] = int(min_spaces_inside)
+ max_spaces_inside = rule_config["max-spaces-inside"]
+ if max_spaces_inside:
+ config["max_spaces_inside"] = int(max_spaces_inside)
+ elif rule == "indentation":
+ indent_sequences = rule_config["indent-sequences"]
+ # one of: bool, "whatever", "consistent"
+ # so, we use True for "whatever" and "consistent"
+ config["indent_sequences"] = bool(indent_sequences)
+ elif rule == "quoted-strings":
+ quote_type = rule_config["quote-type"]
+ # one of: single, double, any
+ if quote_type == "single":
+ config["preferred_quote"] = "'"
+ elif quote_type == "double":
+ config["preferred_quote"] = '"'
+
+ return cast(dict[str, Union[bool, int, str]], config)
+
+ @property # type: ignore[override]
+ def version(self) -> str | tuple[int, int]:
+ """Return the YAML version used to parse or dump.
+
+ Ansible uses PyYAML which only supports YAML 1.1. ruamel.yaml defaults to 1.2.
+ So, we have to make sure we dump yaml files using YAML 1.1.
+ We can relax the version requirement once ansible uses a version of PyYAML
+ that includes this PR: https://github.com/yaml/pyyaml/pull/555
+ """
+ return self._yaml_version
+
+ @version.setter
+ def version(self, value: str | tuple[int, int] | None) -> None:
+ """Ensure that yaml version uses our default value.
+
+ The yaml Reader updates this value based on the ``%YAML`` directive in files.
+ So, if a file does not include the directive, it sets this to None.
+ But, None effectively resets the parsing version to YAML 1.2 (ruamel's default).
+ """
+ self._yaml_version = value if value is not None else self._yaml_version_default
+
+ def loads(self, stream: str) -> Any:
+ """Load YAML content from a string while avoiding known ruamel.yaml issues."""
+ if not isinstance(stream, str):
+ msg = f"expected a str but got {type(stream)}"
+ raise NotImplementedError(msg)
+ # As ruamel drops comments for any document that is not a mapping or sequence,
+ # we need to avoid using it to reformat those documents.
+ # https://sourceforge.net/p/ruamel-yaml/tickets/460/
+
+ text, preamble_comment = self._pre_process_yaml(stream)
+ data = self.load(stream=text)
+ if preamble_comment is not None and isinstance(
+ data,
+ (CommentedMap, CommentedSeq),
+ ):
+ data.preamble_comment = preamble_comment # type: ignore[union-attr]
+ # Because data can validly also be None for empty documents, we cannot
+ # really annotate the return type here, so we need to remember to
+ # never save None or scalar data types when reformatting.
+ return data
+
+ def dumps(self, data: Any) -> str:
+ """Dump YAML document to string (including its preamble_comment)."""
+ preamble_comment: str | None = getattr(data, "preamble_comment", None)
+ self._prevent_wrapping_flow_style(data)
+ with StringIO() as stream:
+ if preamble_comment:
+ stream.write(preamble_comment)
+ self.dump(data, stream)
+ text = stream.getvalue()
+ return self._post_process_yaml(text)
+
+ def _prevent_wrapping_flow_style(self, data: Any) -> None:
+ if not isinstance(data, (CommentedMap, CommentedSeq)):
+ return
+ for key, value, parent_path in nested_items_path(data):
+ if not isinstance(value, (CommentedMap, CommentedSeq)):
+ continue
+ fa: Format = value.fa # pylint: disable=invalid-name
+ if fa.flow_style():
+ predicted_indent = self._predict_indent_length(parent_path, key)
+ predicted_width = len(str(value))
+ if predicted_indent + predicted_width > self.width:
+ # this flow-style map will probably get line-wrapped,
+ # so, switch it to block style to avoid the line wrap.
+ fa.set_block_style()
+
+ def _predict_indent_length(self, parent_path: list[str | int], key: Any) -> int:
+ indent = 0
+
+ # each parent_key type tells us what the indent is for the next level.
+ for parent_key in parent_path:
+ if isinstance(parent_key, int) and indent == 0:
+ # root level is a sequence
+ indent += self.sequence_dash_offset
+ elif isinstance(parent_key, int):
+ # next level is a sequence
+ indent += cast(int, self.sequence_indent)
+ elif isinstance(parent_key, str):
+ # next level is a map
+ indent += cast(int, self.map_indent)
+
+ if isinstance(key, int) and indent == 0:
+ # flow map is an item in a root-level sequence
+ indent += self.sequence_dash_offset
+ elif isinstance(key, int) and indent > 0:
+ # flow map is in a sequence
+ indent += cast(int, self.sequence_indent)
+ elif isinstance(key, str):
+ # flow map is in a map
+ indent += len(key + ": ")
+
+ return indent
+
+ # ruamel.yaml only preserves empty (no whitespace) blank lines
+ # (ie "/n/n" becomes "/n/n" but "/n /n" becomes "/n").
+ # So, we need to identify whitespace-only lines to drop spaces before reading.
+ _whitespace_only_lines_re = re.compile(r"^ +$", re.MULTILINE)
+
+ def _pre_process_yaml(self, text: str) -> tuple[str, str | None]:
+ """Handle known issues with ruamel.yaml loading.
+
+ Preserve blank lines despite extra whitespace.
+ Preserve any preamble (aka header) comments before "---".
+
+ For more on preamble comments, see: https://stackoverflow.com/questions/70286108/python-ruamel-yaml-package-how-to-get-header-comment-lines/70287507#70287507
+ """
+ text = self._whitespace_only_lines_re.sub("", text)
+
+ # I investigated extending ruamel.yaml to capture preamble comments.
+ # preamble comment goes from:
+ # DocumentStartToken.comment -> DocumentStartEvent.comment
+ # Then, in the composer:
+ # once in composer.current_event
+ # discards DocumentStartEvent
+ # move DocumentStartEvent to composer.last_event
+ # all document nodes get composed (events get used)
+ # discard DocumentEndEvent
+ # move DocumentEndEvent to composer.last_event
+ # So, there's no convenient way to extend the composer
+ # to somehow capture the comments and pass them on.
+
+ preamble_comments = []
+ if "\n---\n" not in text and "\n--- " not in text:
+ # nothing is before the document start mark,
+ # so there are no comments to preserve.
+ return text, None
+ for line in text.splitlines(True):
+ # We only need to capture the preamble comments. No need to remove them.
+ # lines might also include directives.
+ if line.lstrip().startswith("#") or line == "\n":
+ preamble_comments.append(line)
+ elif line.startswith("---"):
+ break
+
+ return text, "".join(preamble_comments) or None
+
+ @staticmethod
+ def _post_process_yaml(text: str) -> str:
+ """Handle known issues with ruamel.yaml dumping.
+
+ Make sure there's only one newline at the end of the file.
+
+ Fix the indent of full-line comments to match the indent of the next line.
+ See: https://stackoverflow.com/questions/71354698/how-can-i-use-the-ruamel-yaml-rtsc-mode/71355688#71355688
+ Also, removes "#" protection from strings that prevents them from being
+ identified as full line comments in post-processing.
+
+ Make sure null list items don't end in a space.
+ """
+ text = text.rstrip("\n") + "\n"
+
+ lines = text.splitlines(keepends=True)
+ full_line_comments: list[tuple[int, str]] = []
+ for i, line in enumerate(lines):
+ stripped = line.lstrip()
+ if not stripped:
+ # blank line. Move on.
+ continue
+
+ space_length = len(line) - len(stripped)
+
+ if stripped.startswith("#"):
+ # got a full line comment
+
+ # allow some full line comments to match the previous indent
+ if i > 0 and not full_line_comments and space_length:
+ prev = lines[i - 1]
+ prev_space_length = len(prev) - len(prev.lstrip())
+ if prev_space_length == space_length:
+ # if the indent matches the previous line's indent, skip it.
+ continue
+
+ full_line_comments.append((i, stripped))
+ elif full_line_comments:
+ # end of full line comments so adjust to match indent of this line
+ spaces = " " * space_length
+ for index, comment in full_line_comments:
+ lines[index] = spaces + comment
+ full_line_comments.clear()
+
+ cleaned = line.strip()
+ if not cleaned.startswith("#") and cleaned.endswith("-"):
+ # got an empty list item. drop any trailing spaces.
+ lines[i] = line.rstrip() + "\n"
+
+ text = "".join(
+ FormattedEmitter.drop_octothorpe_protection(line) for line in lines
+ )
+ return text
+
+
+def clean_json(
+ obj: Any,
+ func: Callable[[str], Any] = lambda key: key.startswith("__")
+ if isinstance(key, str)
+ else False,
+) -> Any:
+ """Remove all keys matching the condition from a nested JSON-like object.
+
+ :param obj: a JSON like object to clean, also returned for chaining.
+ :param func: a callable that takes a key in argument and return True for each key to delete
+ """
+ if isinstance(obj, dict):
+ for key in list(obj.keys()):
+ if func(key):
+ del obj[key]
+ else:
+ clean_json(obj[key], func)
+ elif isinstance(obj, list):
+ for i in reversed(range(len(obj))):
+ if func(obj[i]):
+ del obj[i]
+ else:
+ clean_json(obj[i], func)
+ else:
+ # neither a dict nor a list, do nothing
+ pass
+ return obj