summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/yaml_utils.py
blob: cc7e9ef31bc42bb7bc91d0a42a83d815800b8397 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
"""Utility helpers to simplify working with yaml-based data."""
# pylint: disable=too-many-lines
from __future__ import annotations

import functools
import logging
import os
import re
from collections.abc import Iterator, Sequence
from io import StringIO
from pathlib import Path
from re import Pattern
from typing import TYPE_CHECKING, Any, Callable, Union, cast

import ruamel.yaml.events
from ruamel.yaml.comments import CommentedMap, CommentedSeq, Format
from ruamel.yaml.constructor import RoundTripConstructor
from ruamel.yaml.emitter import Emitter, ScalarAnalysis

# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled
# To make the type checkers happy, we import from ruamel.yaml.main instead.
from ruamel.yaml.main import YAML
from ruamel.yaml.scalarint import ScalarInt
from yamllint.config import YamlLintConfig

from ansiblelint.constants import (
    ANNOTATION_KEYS,
    NESTED_TASK_KEYS,
    PLAYBOOK_TASK_KEYWORDS,
)
from ansiblelint.utils import Task

if TYPE_CHECKING:
    # noinspection PyProtectedMember
    from ruamel.yaml.comments import LineCol  # pylint: disable=ungrouped-imports
    from ruamel.yaml.nodes import ScalarNode
    from ruamel.yaml.representer import RoundTripRepresenter
    from ruamel.yaml.tokens import CommentToken

    from ansiblelint.file_utils import Lintable

_logger = logging.getLogger(__name__)

YAMLLINT_CONFIG = """
extends: default
rules:
  comments:
    # https://github.com/prettier/prettier/issues/6780
    min-spaces-from-content: 1
  # https://github.com/adrienverge/yamllint/issues/384
  comments-indentation: false
  document-start: disable
  # 160 chars was the default used by old E204 rule, but
  # you can easily change it or disable in your .yamllint file.
  line-length:
    max: 160
  # We are adding an extra space inside braces as that's how prettier does it
  # and we are trying not to fight other linters.
  braces:
    min-spaces-inside: 0  # yamllint defaults to 0
    max-spaces-inside: 1  # yamllint defaults to 0
  octal-values:
    forbid-implicit-octal: true  # yamllint defaults to false
    forbid-explicit-octal: true  # yamllint defaults to false
"""


def deannotate(data: Any) -> Any:
    """Remove our annotations like __file__ and __line__ and return a JSON serializable object."""
    if isinstance(data, dict):
        result = data.copy()
        for key, value in data.items():
            if key in ANNOTATION_KEYS:
                del result[key]
            else:
                result[key] = deannotate(value)
        return result
    if isinstance(data, list):
        return [deannotate(item) for item in data if item not in ANNOTATION_KEYS]
    return data


@functools.lru_cache(maxsize=1)
def load_yamllint_config() -> YamlLintConfig:
    """Load our default yamllint config and any customized override file."""
    config = YamlLintConfig(content=YAMLLINT_CONFIG)
    # if we detect local yamllint config we use it but raise a warning
    # as this is likely to get out of sync with our internal config.
    for path in [
        ".yamllint",
        ".yamllint.yaml",
        ".yamllint.yml",
        os.getenv("YAMLLINT_CONFIG_FILE", ""),
        os.getenv("XDG_CONFIG_HOME", "~/.config") + "/yamllint/config",
    ]:
        file = Path(path).expanduser()
        if file.is_file():
            _logger.debug(
                "Loading custom %s config file, this extends our "
                "internal yamllint config.",
                file,
            )
            config_override = YamlLintConfig(file=str(file))
            config_override.extend(config)
            config = config_override
            break
    _logger.debug("Effective yamllint rules used: %s", config.rules)
    return config


def nested_items_path(
    data_collection: dict[Any, Any] | list[Any],
    ignored_keys: Sequence[str] = (),
) -> Iterator[tuple[Any, Any, list[str | int]]]:
    """Iterate a nested data structure, yielding key/index, value, and parent_path.

    This is a recursive function that calls itself for each nested layer of data.
    Each iteration yields:

    1. the current item's dictionary key or list index,
    2. the current item's value, and
    3. the path to the current item from the outermost data structure.

    For dicts, the yielded (1) key and (2) value are what ``dict.items()`` yields.
    For lists, the yielded (1) index and (2) value are what ``enumerate()`` yields.
    The final component, the parent path, is a list of dict keys and list indexes.
    The parent path can be helpful in providing error messages that indicate
    precisely which part of a yaml file (or other data structure) needs to be fixed.

    For example, given this playbook:

    .. code-block:: yaml

        - name: A play
          tasks:
          - name: A task
            debug:
              msg: foobar

    Here's the first and last yielded items:

    .. code-block:: python

        >>> playbook=[{"name": "a play", "tasks": [{"name": "a task", "debug": {"msg": "foobar"}}]}]
        >>> next( nested_items_path( playbook ) )
        (0, {'name': 'a play', 'tasks': [{'name': 'a task', 'debug': {'msg': 'foobar'}}]}, [])
        >>> list( nested_items_path( playbook ) )[-1]
        ('msg', 'foobar', [0, 'tasks', 0, 'debug'])

    Note that, for outermost data structure, the parent path is ``[]`` because
    you do not need to descend into any nested dicts or lists to find the indicated
    key and value.

    If a rule were designed to prohibit "foobar" debug messages, it could use the
    parent path to provide a path to the problematic ``msg``. It might use a jq-style
    path in its error message: "the error is at ``.[0].tasks[0].debug.msg``".
    Or if a utility could automatically fix issues, it could use the path to descend
    to the parent object using something like this:

    .. code-block:: python

        target = data
        for segment in parent_path:
            target = target[segment]

    :param data_collection: The nested data (dicts or lists).

    :returns: each iteration yields the key (of the parent dict) or the index (lists)
    """
    # As typing and mypy cannot effectively ensure we are called only with
    # valid data, we better ignore NoneType
    if data_collection is None:
        return
    data: dict[Any, Any] | list[Any]
    if isinstance(data_collection, Task):
        data = data_collection.normalized_task
    else:
        data = data_collection
    yield from _nested_items_path(
        data_collection=data,
        parent_path=[],
        ignored_keys=ignored_keys,
    )


def _nested_items_path(
    data_collection: dict[Any, Any] | list[Any],
    parent_path: list[str | int],
    ignored_keys: Sequence[str] = (),
) -> Iterator[tuple[Any, Any, list[str | int]]]:
    """Iterate through data_collection (internal implementation of nested_items_path).

    This is a separate function because callers of nested_items_path should
    not be using the parent_path param which is used in recursive _nested_items_path
    calls to build up the path to the parent object of the current key/index, value.
    """
    # we have to cast each convert_to_tuples assignment or mypy complains
    # that both assignments (for dict and list) do not have the same type
    convert_to_tuples_type = Callable[[], Iterator[tuple[Union[str, int], Any]]]
    if isinstance(data_collection, dict):
        convert_data_collection_to_tuples = cast(
            convert_to_tuples_type,
            functools.partial(data_collection.items),
        )
    elif isinstance(data_collection, list):
        convert_data_collection_to_tuples = cast(
            convert_to_tuples_type,
            functools.partial(enumerate, data_collection),
        )
    else:
        msg = f"Expected a dict or a list but got {data_collection!r} of type '{type(data_collection)}'"
        raise TypeError(msg)
    for key, value in convert_data_collection_to_tuples():
        if key in (*ANNOTATION_KEYS, *ignored_keys):
            continue
        yield key, value, parent_path
        if isinstance(value, (dict, list)):
            yield from _nested_items_path(
                data_collection=value,
                parent_path=[*parent_path, key],
            )


def get_path_to_play(
    lintable: Lintable,
    lineno: int,  # 1-based
    ruamel_data: CommentedMap | CommentedSeq,
) -> list[str | int]:
    """Get the path to the play in the given file at the given line number."""
    if lineno < 1:
        msg = f"expected lineno >= 1, got {lineno}"
        raise ValueError(msg)
    if lintable.kind != "playbook" or not isinstance(ruamel_data, CommentedSeq):
        return []
    lc: LineCol  # lc uses 0-based counts # pylint: disable=invalid-name
    # lineno is 1-based. Convert to 0-based.
    line_index = lineno - 1

    prev_play_line_index = ruamel_data.lc.line
    last_play_index = len(ruamel_data)
    for play_index, play in enumerate(ruamel_data):
        next_play_index = play_index + 1
        if last_play_index > next_play_index:
            next_play_line_index = ruamel_data[next_play_index].lc.line
        else:
            next_play_line_index = None

        lc = play.lc  # pylint: disable=invalid-name
        if not isinstance(lc.line, int):
            msg = f"expected lc.line to be an int, got {lc.line!r}"
            raise RuntimeError(msg)
        if lc.line == line_index:
            return [play_index]
        if play_index > 0 and prev_play_line_index < line_index < lc.line:
            return [play_index - 1]
        # The previous play check (above) can't catch the last play,
        # so, handle the last play separately.
        if (
            next_play_index == last_play_index
            and line_index > lc.line
            and (next_play_line_index is None or line_index < next_play_line_index)
        ):
            # part of this (last) play
            return [play_index]
        prev_play_line_index = play.lc.line
    return []


def get_path_to_task(
    lintable: Lintable,
    lineno: int,  # 1-based
    ruamel_data: CommentedMap | CommentedSeq,
) -> list[str | int]:
    """Get the path to the task in the given file at the given line number."""
    if lineno < 1:
        msg = f"expected lineno >= 1, got {lineno}"
        raise ValueError(msg)
    if lintable.kind in ("tasks", "handlers", "playbook"):
        if not isinstance(ruamel_data, CommentedSeq):
            msg = f"expected ruamel_data to be a CommentedSeq, got {ruamel_data!r}"
            raise ValueError(msg)
        if lintable.kind in ("tasks", "handlers"):
            return _get_path_to_task_in_tasks_block(lineno, ruamel_data)
        if lintable.kind == "playbook":
            return _get_path_to_task_in_playbook(lineno, ruamel_data)

    return []


def _get_path_to_task_in_playbook(
    lineno: int,  # 1-based
    ruamel_data: CommentedSeq,
) -> list[str | int]:
    """Get the path to the task in the given playbook data at the given line number."""
    last_play_index = len(ruamel_data)
    for play_index, play in enumerate(ruamel_data):
        next_play_index = play_index + 1
        if last_play_index > next_play_index:
            next_play_line_index = ruamel_data[next_play_index].lc.line
        else:
            next_play_line_index = None

        play_keys = list(play.keys())
        for tasks_keyword in PLAYBOOK_TASK_KEYWORDS:
            if not play.get(tasks_keyword):
                continue

            try:
                next_keyword = play_keys[play_keys.index(tasks_keyword) + 1]
            except IndexError:
                next_block_line_index = None
            else:
                next_block_line_index = play.lc.data[next_keyword][0]
            # last_lineno_in_block is 1-based; next_*_line_index is 0-based
            # next_*_line_index - 1 to get line before next_*_line_index.
            # Then + 1 to make it a 1-based number.
            if next_block_line_index is not None:
                last_lineno_in_block = next_block_line_index
            elif next_play_line_index is not None:
                last_lineno_in_block = next_play_line_index
            else:
                last_lineno_in_block = None

            task_path = _get_path_to_task_in_tasks_block(
                lineno,
                play[tasks_keyword],
                last_lineno_in_block,
            )
            if task_path:
                # mypy gets confused without this typehint
                tasks_keyword_path: list[int | str] = [
                    play_index,
                    tasks_keyword,
                ]
                return tasks_keyword_path + list(task_path)
    # lineno is before first play or no tasks keywords in any of the plays
    return []


def _get_path_to_task_in_tasks_block(
    lineno: int,  # 1-based
    tasks_block: CommentedSeq,
    last_lineno: int | None = None,  # 1-based
) -> list[str | int]:
    """Get the path to the task in the given tasks block at the given line number."""
    task: CommentedMap | None
    # lineno and last_lineno are 1-based. Convert to 0-based.
    line_index = lineno - 1
    last_line_index = None if last_lineno is None else last_lineno - 1

    # lc (LineCol) uses 0-based counts
    prev_task_line_index = tasks_block.lc.line
    last_task_index = len(tasks_block)
    for task_index, task in enumerate(tasks_block):
        next_task_index = task_index + 1
        if last_task_index > next_task_index:
            if tasks_block[next_task_index] is not None:
                next_task_line_index = tasks_block[next_task_index].lc.line
            else:
                next_task_line_index = tasks_block.lc.item(next_task_index)[0]
        else:
            next_task_line_index = None

        if task is None:
            # create a dummy task to represent the null task
            task = CommentedMap()
            task.lc.line, task.lc.col = tasks_block.lc.item(task_index)

        nested_task_keys = set(task.keys()).intersection(set(NESTED_TASK_KEYS))
        if nested_task_keys:
            subtask_path = _get_path_to_task_in_nested_tasks_block(
                lineno,
                task,
                nested_task_keys,
                next_task_line_index,
            )
            if subtask_path:
                # mypy gets confused without this typehint
                task_path: list[str | int] = [task_index]
                return task_path + list(subtask_path)

        if not isinstance(task.lc.line, int):
            msg = f"expected task.lc.line to be an int, got {task.lc.line!r}"
            raise RuntimeError(msg)
        if task.lc.line == line_index:
            return [task_index]
        if task_index > 0 and prev_task_line_index < line_index < task.lc.line:
            return [task_index - 1]
        # The previous task check can't catch the last task,
        # so, handle the last task separately (also after subtask checks).
        # pylint: disable=too-many-boolean-expressions
        if (
            next_task_index == last_task_index
            and line_index > task.lc.line
            and (next_task_line_index is None or line_index < next_task_line_index)
            and (last_line_index is None or line_index <= last_line_index)
        ):
            # part of this (last) task
            return [task_index]
        prev_task_line_index = task.lc.line
    # line is not part of this tasks block
    return []


def _get_path_to_task_in_nested_tasks_block(
    lineno: int,  # 1-based
    task: CommentedMap,
    nested_task_keys: set[str],
    next_task_line_index: int | None = None,  # 0-based
) -> list[str | int]:
    """Get the path to the task in the given nested tasks block."""
    # loop through the keys in line order
    task_keys = list(task.keys())
    task_keys_by_index = dict(enumerate(task_keys))
    for task_index, task_key in enumerate(task_keys):
        nested_task_block = task[task_key]
        if task_key not in nested_task_keys or not nested_task_block:
            continue
        next_task_key = task_keys_by_index.get(task_index + 1, None)
        if next_task_key is not None:
            next_task_key_line_index = task.lc.data[next_task_key][0]
        else:
            next_task_key_line_index = None
        # last_lineno_in_block is 1-based; next_*_line_index is 0-based
        # next_*_line_index - 1 to get line before next_*_line_index.
        # Then + 1 to make it a 1-based number.
        last_lineno_in_block = (
            next_task_key_line_index
            if next_task_key_line_index is not None
            else next_task_line_index
        )
        subtask_path = _get_path_to_task_in_tasks_block(
            lineno,
            nested_task_block,
            last_lineno_in_block,  # 1-based
        )
        if subtask_path:
            return [task_key, *list(subtask_path)]
    # line is not part of this nested tasks block
    return []


class OctalIntYAML11(ScalarInt):
    """OctalInt representation for YAML 1.1."""

    # tell mypy that ScalarInt has these attributes
    _width: Any
    _underscore: Any

    def __new__(cls, *args: Any, **kwargs: Any) -> Any:
        """Create a new int with ScalarInt-defined attributes."""
        return ScalarInt.__new__(cls, *args, **kwargs)

    @staticmethod
    def represent_octal(representer: RoundTripRepresenter, data: OctalIntYAML11) -> Any:
        """Return a YAML 1.1 octal representation.

        Based on ruamel.yaml.representer.RoundTripRepresenter.represent_octal_int()
        (which only handles the YAML 1.2 octal representation).
        """
        v = format(data, "o")
        anchor = data.yaml_anchor(any=True)
        # noinspection PyProtectedMember
        # pylint: disable=protected-access
        return representer.insert_underscore(
            "0",
            v,
            data._underscore,  # noqa: SLF001
            anchor=anchor,
        )


class CustomConstructor(RoundTripConstructor):
    """Custom YAML constructor that preserves Octal formatting in YAML 1.1."""

    def construct_yaml_int(self, node: ScalarNode) -> Any:
        """Construct int while preserving Octal formatting in YAML 1.1.

        ruamel.yaml only preserves the octal format for YAML 1.2.
        For 1.1, it converts the octal to an int. So, we preserve the format.

        Code partially copied from ruamel.yaml (MIT licensed).
        """
        ret = super().construct_yaml_int(node)
        if self.resolver.processing_version == (1, 1) and isinstance(ret, int):
            # Do not rewrite zero as octal.
            if ret == 0:
                return ret
            # see if we've got an octal we need to preserve.
            value_su = self.construct_scalar(node)
            try:
                v = value_su.rstrip("_")
                underscore = [len(v) - v.rindex("_") - 1, False, False]  # type: Any
            except ValueError:
                underscore = None
            except IndexError:
                underscore = None
            value_s = value_su.replace("_", "")
            if value_s[0] in "+-":
                value_s = value_s[1:]
            if value_s[0] == "0":
                # got an octal in YAML 1.1
                ret = OctalIntYAML11(
                    ret,
                    width=None,
                    underscore=underscore,
                    anchor=node.anchor,
                )
        return ret


CustomConstructor.add_constructor(
    "tag:yaml.org,2002:int",
    CustomConstructor.construct_yaml_int,
)


class FormattedEmitter(Emitter):
    """Emitter that applies custom formatting rules when dumping YAML.

    Differences from ruamel.yaml defaults:

      - indentation of root-level sequences
      - prefer double-quoted scalars over single-quoted scalars

    This ensures that root-level sequences are never indented.
    All subsequent levels are indented as configured (normal ruamel.yaml behavior).

    Earlier implementations used dedent on ruamel.yaml's dumped output,
    but string magic like that had a ton of problematic edge cases.
    """

    preferred_quote = '"'  # either " or '

    min_spaces_inside = 0
    max_spaces_inside = 1

    _sequence_indent = 2
    _sequence_dash_offset = 0  # Should be _sequence_indent - 2
    _root_is_sequence = False

    _in_empty_flow_map = False

    @property
    def _is_root_level_sequence(self) -> bool:
        """Return True if this is a sequence at the root level of the yaml document."""
        return self.column < 2 and self._root_is_sequence

    def expect_document_root(self) -> None:
        """Expect doc root (extend to record if the root doc is a sequence)."""
        self._root_is_sequence = isinstance(
            self.event,
            ruamel.yaml.events.SequenceStartEvent,
        )
        return super().expect_document_root()

    # NB: mypy does not support overriding attributes with properties yet:
    #     https://github.com/python/mypy/issues/4125
    #     To silence we have to ignore[override] both the @property and the method.

    @property
    def best_sequence_indent(self) -> int:
        """Return the configured sequence_indent or 2 for root level."""
        return 2 if self._is_root_level_sequence else self._sequence_indent

    @best_sequence_indent.setter
    def best_sequence_indent(self, value: int) -> None:
        """Configure how many columns to indent each sequence item (including the '-')."""
        self._sequence_indent = value

    @property
    def sequence_dash_offset(self) -> int:
        """Return the configured sequence_dash_offset or 0 for root level."""
        return 0 if self._is_root_level_sequence else self._sequence_dash_offset

    @sequence_dash_offset.setter
    def sequence_dash_offset(self, value: int) -> None:
        """Configure how many spaces to put before each sequence item's '-'."""
        self._sequence_dash_offset = value

    def choose_scalar_style(self) -> Any:
        """Select how to quote scalars if needed."""
        style = super().choose_scalar_style()
        if (
            style == ""  # noqa: PLC1901
            and self.event.value.startswith("0")
            and len(self.event.value) > 1
        ):
            if self.event.tag == "tag:yaml.org,2002:int" and self.event.implicit[0]:
                # ensures that "0123" string does not lose its quoting
                self.event.tag = "tag:yaml.org,2002:str"
                self.event.implicit = (True, True, True)
            return '"'
        if style != "'":
            # block scalar, double quoted, etc.
            return style
        if '"' in self.event.value:
            return "'"
        return self.preferred_quote

    def write_indicator(
        self,
        indicator: str,  # ruamel.yaml typehint is wrong. This is a string.
        need_whitespace: bool,
        whitespace: bool = False,  # noqa: FBT002
        indention: bool = False,  # (sic) ruamel.yaml has this typo in their API # noqa: FBT002
    ) -> None:
        """Make sure that flow maps get whitespace by the curly braces."""
        # We try to go with one whitespace by the curly braces and adjust accordingly
        # to what min_spaces_inside and max_spaces_inside are set to.
        # This assumes min_spaces_inside <= max_spaces_inside
        spaces_inside = min(
            max(1, self.min_spaces_inside),
            self.max_spaces_inside if self.max_spaces_inside != -1 else 1,
        )
        # If this is the end of the flow mapping that isn't on a new line:
        if (
            indicator == "}"
            and (self.column or 0) > (self.indent or 0)
            and not self._in_empty_flow_map
        ):
            indicator = (" " * spaces_inside) + "}"
        super().write_indicator(indicator, need_whitespace, whitespace, indention)
        # if it is the start of a flow mapping, and it's not time
        # to wrap the lines, insert a space.
        if indicator == "{" and self.column < self.best_width:
            if self.check_empty_mapping():
                self._in_empty_flow_map = True
            else:
                self.column += 1
                self.stream.write(" " * spaces_inside)
                self._in_empty_flow_map = False

    # "/n/n" results in one blank line (end the previous line, then newline).
    # So, "/n/n/n" or more is too many new lines. Clean it up.
    _re_repeat_blank_lines: Pattern[str] = re.compile(r"\n{3,}")

    @staticmethod
    def add_octothorpe_protection(string: str) -> str:
        """Modify strings to protect "#" from full-line-comment post-processing."""
        try:
            if "#" in string:
                # # is \uFF03 (fullwidth number sign)
                # ﹟ is \uFE5F (small number sign)
                string = string.replace("#", "\uFF03#\uFE5F")
                # this is safe even if this sequence is present
                # because it gets reversed in post-processing
        except (ValueError, TypeError):
            # probably not really a string. Whatever.
            pass
        return string

    @staticmethod
    def drop_octothorpe_protection(string: str) -> str:
        """Remove string protection of "#" after full-line-comment post-processing."""
        try:
            if "\uFF03#\uFE5F" in string:
                # # is \uFF03 (fullwidth number sign)
                # ﹟ is \uFE5F (small number sign)
                string = string.replace("\uFF03#\uFE5F", "#")
        except (ValueError, TypeError):
            # probably not really a string. Whatever.
            pass
        return string

    def analyze_scalar(self, scalar: str) -> ScalarAnalysis:
        """Determine quoting and other requirements for string.

        And protect "#" from full-line-comment post-processing.
        """
        analysis: ScalarAnalysis = super().analyze_scalar(scalar)
        if analysis.empty:
            return analysis
        analysis.scalar = self.add_octothorpe_protection(analysis.scalar)
        return analysis

    # comment is a CommentToken, not Any (Any is ruamel.yaml's lazy type hint).
    def write_comment(
        self,
        comment: CommentToken,
        pre: bool = False,  # noqa: FBT002
    ) -> None:
        """Clean up extra new lines and spaces in comments.

        ruamel.yaml treats new or empty lines as comments.
        See: https://stackoverflow.com/questions/42708668/removing-all-blank-lines-but-not-comments-in-ruamel-yaml/42712747#42712747
        """
        value: str = comment.value
        if (
            pre
            and not value.strip()
            and not isinstance(
                self.event,
                (
                    ruamel.yaml.events.CollectionEndEvent,
                    ruamel.yaml.events.DocumentEndEvent,
                    ruamel.yaml.events.StreamEndEvent,
                ),
            )
        ):
            # drop pure whitespace pre comments
            # does not apply to End events since they consume one of the newlines.
            value = ""
        elif pre:
            # preserve content in pre comment with at least one newline,
            # but no extra blank lines.
            value = self._re_repeat_blank_lines.sub("\n", value)
        else:
            # single blank lines in post comments
            value = self._re_repeat_blank_lines.sub("\n\n", value)
        comment.value = value

        # make sure that the eol comment only has one space before it.
        if comment.column > self.column + 1 and not pre:
            comment.column = self.column + 1

        return super().write_comment(comment, pre)

    def write_version_directive(self, version_text: Any) -> None:
        """Skip writing '%YAML 1.1'."""
        if version_text == "1.1":
            return
        super().write_version_directive(version_text)


# pylint: disable=too-many-instance-attributes
class FormattedYAML(YAML):
    """A YAML loader/dumper that handles ansible content better by default."""

    def __init__(
        self,
        *,
        typ: str | None = None,
        pure: bool = False,
        output: Any = None,
        plug_ins: list[str] | None = None,
    ):
        """Return a configured ``ruamel.yaml.YAML`` instance.

        Some config defaults get extracted from the yamllint config.

        ``ruamel.yaml.YAML`` uses attributes to configure how it dumps yaml files.
        Some of these settings can be confusing, so here are examples of how different
        settings will affect the dumped yaml.

        This example does not indent any sequences:

        .. code:: python

            yaml.explicit_start=True
            yaml.map_indent=2
            yaml.sequence_indent=2
            yaml.sequence_dash_offset=0

        .. code:: yaml

            ---
            - name: A playbook
              tasks:
              - name: Task

        This example indents all sequences including the root-level:

        .. code:: python

            yaml.explicit_start=True
            yaml.map_indent=2
            yaml.sequence_indent=4
            yaml.sequence_dash_offset=2
            # yaml.Emitter defaults to ruamel.yaml.emitter.Emitter

        .. code:: yaml

            ---
              - name: Playbook
                tasks:
                  - name: Task

        This example indents all sequences except at the root-level:

        .. code:: python

            yaml.explicit_start=True
            yaml.map_indent=2
            yaml.sequence_indent=4
            yaml.sequence_dash_offset=2
            yaml.Emitter = FormattedEmitter  # custom Emitter prevents root-level indents

        .. code:: yaml

            ---
            - name: Playbook
              tasks:
                - name: Task
        """
        # Default to reading/dumping YAML 1.1 (ruamel.yaml defaults to 1.2)
        self._yaml_version_default: tuple[int, int] = (1, 1)
        self._yaml_version: str | tuple[int, int] = self._yaml_version_default

        super().__init__(typ=typ, pure=pure, output=output, plug_ins=plug_ins)

        # NB: We ignore some mypy issues because ruamel.yaml typehints are not great.

        config = self._defaults_from_yamllint_config()

        # these settings are derived from yamllint config
        self.explicit_start: bool = config["explicit_start"]  # type: ignore[assignment]
        self.explicit_end: bool = config["explicit_end"]  # type: ignore[assignment]
        self.width: int = config["width"]  # type: ignore[assignment]
        indent_sequences: bool = cast(bool, config["indent_sequences"])
        preferred_quote: str = cast(str, config["preferred_quote"])  # either ' or "

        min_spaces_inside: int = cast(int, config["min_spaces_inside"])
        max_spaces_inside: int = cast(int, config["max_spaces_inside"])

        self.default_flow_style = False
        self.compact_seq_seq = True  # type: ignore[assignment] # dash after dash
        self.compact_seq_map = True  # type: ignore[assignment] # key after dash

        # Do not use yaml.indent() as it obscures the purpose of these vars:
        self.map_indent = 2
        self.sequence_indent = 4 if indent_sequences else 2
        self.sequence_dash_offset = self.sequence_indent - 2

        # If someone doesn't want our FormattedEmitter, they can change it.
        self.Emitter = FormattedEmitter

        # ignore invalid preferred_quote setting
        if preferred_quote in ['"', "'"]:
            FormattedEmitter.preferred_quote = preferred_quote
        # NB: default_style affects preferred_quote as well.
        # self.default_style ∈ None (default), '', '"', "'", '|', '>'

        # spaces inside braces for flow mappings
        FormattedEmitter.min_spaces_inside = min_spaces_inside
        FormattedEmitter.max_spaces_inside = max_spaces_inside

        # We need a custom constructor to preserve Octal formatting in YAML 1.1
        self.Constructor = CustomConstructor
        self.Representer.add_representer(OctalIntYAML11, OctalIntYAML11.represent_octal)

        # We should preserve_quotes loads all strings as a str subclass that carries
        # a quote attribute. Will the str subclasses cause problems in transforms?
        # Are there any other gotchas to this?
        #
        # This will only preserve quotes for strings read from the file.
        # anything modified by the transform will use no quotes, preferred_quote,
        # or the quote that results in the least amount of escaping.

        # If needed, we can use this to change null representation to be explicit
        # (see https://stackoverflow.com/a/44314840/1134951)
        # self.Representer.add_representer(

    @staticmethod
    def _defaults_from_yamllint_config() -> dict[str, bool | int | str]:
        """Extract FormattedYAML-relevant settings from yamllint config if possible."""
        config = {
            "explicit_start": True,
            "explicit_end": False,
            "width": 160,
            "indent_sequences": True,
            "preferred_quote": '"',
            "min_spaces_inside": 0,
            "max_spaces_inside": 1,
        }
        for rule, rule_config in load_yamllint_config().rules.items():
            if not rule_config:
                # rule disabled
                continue

            # refactor this if ... elif ... elif ... else monstrosity using match/case (PEP 634) once python 3.10 is mandatory
            if rule == "document-start":
                config["explicit_start"] = rule_config["present"]
            elif rule == "document-end":
                config["explicit_end"] = rule_config["present"]
            elif rule == "line-length":
                config["width"] = rule_config["max"]
            elif rule == "braces":
                min_spaces_inside = rule_config["min-spaces-inside"]
                if min_spaces_inside:
                    config["min_spaces_inside"] = int(min_spaces_inside)
                max_spaces_inside = rule_config["max-spaces-inside"]
                if max_spaces_inside:
                    config["max_spaces_inside"] = int(max_spaces_inside)
            elif rule == "indentation":
                indent_sequences = rule_config["indent-sequences"]
                # one of: bool, "whatever", "consistent"
                # so, we use True for "whatever" and "consistent"
                config["indent_sequences"] = bool(indent_sequences)
            elif rule == "quoted-strings":
                quote_type = rule_config["quote-type"]
                # one of: single, double, any
                if quote_type == "single":
                    config["preferred_quote"] = "'"
                elif quote_type == "double":
                    config["preferred_quote"] = '"'

        return cast(dict[str, Union[bool, int, str]], config)

    @property  # type: ignore[override]
    def version(self) -> str | tuple[int, int]:
        """Return the YAML version used to parse or dump.

        Ansible uses PyYAML which only supports YAML 1.1. ruamel.yaml defaults to 1.2.
        So, we have to make sure we dump yaml files using YAML 1.1.
        We can relax the version requirement once ansible uses a version of PyYAML
        that includes this PR: https://github.com/yaml/pyyaml/pull/555
        """
        return self._yaml_version

    @version.setter
    def version(self, value: str | tuple[int, int] | None) -> None:
        """Ensure that yaml version uses our default value.

        The yaml Reader updates this value based on the ``%YAML`` directive in files.
        So, if a file does not include the directive, it sets this to None.
        But, None effectively resets the parsing version to YAML 1.2 (ruamel's default).
        """
        self._yaml_version = value if value is not None else self._yaml_version_default

    def loads(self, stream: str) -> Any:
        """Load YAML content from a string while avoiding known ruamel.yaml issues."""
        if not isinstance(stream, str):
            msg = f"expected a str but got {type(stream)}"
            raise NotImplementedError(msg)
        # As ruamel drops comments for any document that is not a mapping or sequence,
        # we need to avoid using it to reformat those documents.
        # https://sourceforge.net/p/ruamel-yaml/tickets/460/

        text, preamble_comment = self._pre_process_yaml(stream)
        data = self.load(stream=text)
        if preamble_comment is not None and isinstance(
            data,
            (CommentedMap, CommentedSeq),
        ):
            data.preamble_comment = preamble_comment  # type: ignore[union-attr]
        # Because data can validly also be None for empty documents, we cannot
        # really annotate the return type here, so we need to remember to
        # never save None or scalar data types when reformatting.
        return data

    def dumps(self, data: Any) -> str:
        """Dump YAML document to string (including its preamble_comment)."""
        preamble_comment: str | None = getattr(data, "preamble_comment", None)
        self._prevent_wrapping_flow_style(data)
        with StringIO() as stream:
            if preamble_comment:
                stream.write(preamble_comment)
            self.dump(data, stream)
            text = stream.getvalue()
        return self._post_process_yaml(text)

    def _prevent_wrapping_flow_style(self, data: Any) -> None:
        if not isinstance(data, (CommentedMap, CommentedSeq)):
            return
        for key, value, parent_path in nested_items_path(data):
            if not isinstance(value, (CommentedMap, CommentedSeq)):
                continue
            fa: Format = value.fa  # pylint: disable=invalid-name
            if fa.flow_style():
                predicted_indent = self._predict_indent_length(parent_path, key)
                predicted_width = len(str(value))
                if predicted_indent + predicted_width > self.width:
                    # this flow-style map will probably get line-wrapped,
                    # so, switch it to block style to avoid the line wrap.
                    fa.set_block_style()

    def _predict_indent_length(self, parent_path: list[str | int], key: Any) -> int:
        indent = 0

        # each parent_key type tells us what the indent is for the next level.
        for parent_key in parent_path:
            if isinstance(parent_key, int) and indent == 0:
                # root level is a sequence
                indent += self.sequence_dash_offset
            elif isinstance(parent_key, int):
                # next level is a sequence
                indent += cast(int, self.sequence_indent)
            elif isinstance(parent_key, str):
                # next level is a map
                indent += cast(int, self.map_indent)

        if isinstance(key, int) and indent == 0:
            # flow map is an item in a root-level sequence
            indent += self.sequence_dash_offset
        elif isinstance(key, int) and indent > 0:
            # flow map is in a sequence
            indent += cast(int, self.sequence_indent)
        elif isinstance(key, str):
            # flow map is in a map
            indent += len(key + ": ")

        return indent

    # ruamel.yaml only preserves empty (no whitespace) blank lines
    # (ie "/n/n" becomes "/n/n" but "/n  /n" becomes "/n").
    # So, we need to identify whitespace-only lines to drop spaces before reading.
    _whitespace_only_lines_re = re.compile(r"^ +$", re.MULTILINE)

    def _pre_process_yaml(self, text: str) -> tuple[str, str | None]:
        """Handle known issues with ruamel.yaml loading.

        Preserve blank lines despite extra whitespace.
        Preserve any preamble (aka header) comments before "---".

        For more on preamble comments, see: https://stackoverflow.com/questions/70286108/python-ruamel-yaml-package-how-to-get-header-comment-lines/70287507#70287507
        """
        text = self._whitespace_only_lines_re.sub("", text)

        # I investigated extending ruamel.yaml to capture preamble comments.
        #   preamble comment goes from:
        #     DocumentStartToken.comment -> DocumentStartEvent.comment
        #   Then, in the composer:
        #     once in composer.current_event
        #         discards DocumentStartEvent
        #           move DocumentStartEvent to composer.last_event
        #             all document nodes get composed (events get used)
        #         discard DocumentEndEvent
        #           move DocumentEndEvent to composer.last_event
        # So, there's no convenient way to extend the composer
        # to somehow capture the comments and pass them on.

        preamble_comments = []
        if "\n---\n" not in text and "\n--- " not in text:
            # nothing is before the document start mark,
            # so there are no comments to preserve.
            return text, None
        for line in text.splitlines(True):
            # We only need to capture the preamble comments. No need to remove them.
            # lines might also include directives.
            if line.lstrip().startswith("#") or line == "\n":
                preamble_comments.append(line)
            elif line.startswith("---"):
                break

        return text, "".join(preamble_comments) or None

    @staticmethod
    def _post_process_yaml(text: str) -> str:
        """Handle known issues with ruamel.yaml dumping.

        Make sure there's only one newline at the end of the file.

        Fix the indent of full-line comments to match the indent of the next line.
        See: https://stackoverflow.com/questions/71354698/how-can-i-use-the-ruamel-yaml-rtsc-mode/71355688#71355688
        Also, removes "#" protection from strings that prevents them from being
        identified as full line comments in post-processing.

        Make sure null list items don't end in a space.
        """
        text = text.rstrip("\n") + "\n"

        lines = text.splitlines(keepends=True)
        full_line_comments: list[tuple[int, str]] = []
        for i, line in enumerate(lines):
            stripped = line.lstrip()
            if not stripped:
                # blank line. Move on.
                continue

            space_length = len(line) - len(stripped)

            if stripped.startswith("#"):
                # got a full line comment

                # allow some full line comments to match the previous indent
                if i > 0 and not full_line_comments and space_length:
                    prev = lines[i - 1]
                    prev_space_length = len(prev) - len(prev.lstrip())
                    if prev_space_length == space_length:
                        # if the indent matches the previous line's indent, skip it.
                        continue

                full_line_comments.append((i, stripped))
            elif full_line_comments:
                # end of full line comments so adjust to match indent of this line
                spaces = " " * space_length
                for index, comment in full_line_comments:
                    lines[index] = spaces + comment
                full_line_comments.clear()

            cleaned = line.strip()
            if not cleaned.startswith("#") and cleaned.endswith("-"):
                # got an empty list item. drop any trailing spaces.
                lines[i] = line.rstrip() + "\n"

        text = "".join(
            FormattedEmitter.drop_octothorpe_protection(line) for line in lines
        )
        return text


def clean_json(
    obj: Any,
    func: Callable[[str], Any] = lambda key: key.startswith("__")
    if isinstance(key, str)
    else False,
) -> Any:
    """Remove all keys matching the condition from a nested JSON-like object.

    :param obj: a JSON like object to clean, also returned for chaining.
    :param func: a callable that takes a key in argument and return True for each key to delete
    """
    if isinstance(obj, dict):
        for key in list(obj.keys()):
            if func(key):
                del obj[key]
            else:
                clean_json(obj[key], func)
    elif isinstance(obj, list):
        for i in reversed(range(len(obj))):
            if func(obj[i]):
                del obj[i]
            else:
                clean_json(obj[i], func)
    else:
        # neither a dict nor a list, do nothing
        pass
    return obj