1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
|
"""Utility helpers to simplify working with yaml-based data."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import logging
import os
import re
from io import StringIO
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterator,
Pattern,
Sequence,
Tuple,
Union,
cast,
)
import ruamel.yaml.events
from ruamel.yaml.comments import CommentedMap, CommentedSeq, Format
from ruamel.yaml.constructor import RoundTripConstructor
from ruamel.yaml.emitter import Emitter, ScalarAnalysis
# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled
# To make the type checkers happy, we import from ruamel.yaml.main instead.
from ruamel.yaml.main import YAML
from ruamel.yaml.nodes import ScalarNode
from ruamel.yaml.representer import RoundTripRepresenter
from ruamel.yaml.scalarint import ScalarInt
from ruamel.yaml.tokens import CommentToken
from yamllint.config import YamlLintConfig
from ansiblelint.constants import (
ANNOTATION_KEYS,
NESTED_TASK_KEYS,
PLAYBOOK_TASK_KEYWORDS,
SKIPPED_RULES_KEY,
)
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.utils import get_action_tasks, normalize_task
if TYPE_CHECKING:
# noinspection PyProtectedMember
from ruamel.yaml.comments import LineCol # pylint: disable=ungrouped-imports
_logger = logging.getLogger(__name__)
YAMLLINT_CONFIG = """
extends: default
rules:
comments:
# https://github.com/prettier/prettier/issues/6780
min-spaces-from-content: 1
# https://github.com/adrienverge/yamllint/issues/384
comments-indentation: false
document-start: disable
# 160 chars was the default used by old E204 rule, but
# you can easily change it or disable in your .yamllint file.
line-length:
max: 160
# We are adding an extra space inside braces as that's how prettier does it
# and we are trying not to fight other linters.
braces:
min-spaces-inside: 0 # yamllint defaults to 0
max-spaces-inside: 1 # yamllint defaults to 0
octal-values:
forbid-implicit-octal: true # yamllint defaults to false
forbid-explicit-octal: true # yamllint defaults to false
"""
def deannotate(data: Any) -> Any:
"""Remove our annotations like __file__ and __line__ and return a JSON serializable object."""
if isinstance(data, dict):
result = data.copy()
for key, value in data.items():
if key in ANNOTATION_KEYS:
del result[key]
else:
result[key] = deannotate(value)
return result
if isinstance(data, list):
return [deannotate(item) for item in data if item not in ANNOTATION_KEYS]
return data
@functools.lru_cache(maxsize=1)
def load_yamllint_config() -> YamlLintConfig:
"""Load our default yamllint config and any customized override file."""
config = YamlLintConfig(content=YAMLLINT_CONFIG)
# if we detect local yamllint config we use it but raise a warning
# as this is likely to get out of sync with our internal config.
for file in [
".yamllint",
".yamllint.yaml",
".yamllint.yml",
os.getenv("YAMLLINT_CONFIG_FILE", ""),
os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
+ "/yamllint/config",
]:
if os.path.isfile(file):
_logger.debug(
"Loading custom %s config file, this extends our "
"internal yamllint config.",
file,
)
config_override = YamlLintConfig(file=file)
config_override.extend(config)
config = config_override
break
_logger.debug("Effective yamllint rules used: %s", config.rules)
return config
def iter_tasks_in_file(
lintable: Lintable,
) -> Iterator[tuple[dict[str, Any], dict[str, Any], list[str], MatchError | None]]:
"""Iterate over tasks in file.
This yields a 4-tuple of raw_task, normalized_task, skip_tags, and error.
raw_task:
When looping through the tasks in the file, each "raw_task" is minimally
processed to include these special keys: __line__, __file__, skipped_rules.
normalized_task:
When each raw_task is "normalized", action shorthand (strings) get parsed
by ansible into python objects and the action key gets normalized. If the task
should be skipped (skipped is True) or normalizing it fails (error is not None)
then this is just the raw_task instead of a normalized copy.
skip_tags:
List of tags found to be skipped, from tags block or noqa comments
error:
This is normally None. It will be a MatchError when the raw_task cannot be
normalized due to an AnsibleParserError.
:param lintable: The playbook or tasks/handlers yaml file to get tasks from
Yields raw_task, normalized_task, skipped, error
"""
data = lintable.data
if not data:
return
raw_tasks = get_action_tasks(data, lintable)
for raw_task in raw_tasks:
err: MatchError | None = None
skip_tags: list[str] = raw_task.get(SKIPPED_RULES_KEY, [])
try:
normalized_task = normalize_task(raw_task, str(lintable.path))
except MatchError as err:
# normalize_task converts AnsibleParserError to MatchError
yield raw_task, raw_task, skip_tags, err
return
if "skip_ansible_lint" in raw_task.get("tags", []):
skip_tags.append("skip_ansible_lint")
if skip_tags:
yield raw_task, normalized_task, skip_tags, err
continue
yield raw_task, normalized_task, skip_tags, err
def nested_items_path(
data_collection: dict[Any, Any] | list[Any],
ignored_keys: Sequence[str] = (),
) -> Iterator[tuple[Any, Any, list[str | int]]]:
"""Iterate a nested data structure, yielding key/index, value, and parent_path.
This is a recursive function that calls itself for each nested layer of data.
Each iteration yields:
1. the current item's dictionary key or list index,
2. the current item's value, and
3. the path to the current item from the outermost data structure.
For dicts, the yielded (1) key and (2) value are what ``dict.items()`` yields.
For lists, the yielded (1) index and (2) value are what ``enumerate()`` yields.
The final component, the parent path, is a list of dict keys and list indexes.
The parent path can be helpful in providing error messages that indicate
precisely which part of a yaml file (or other data structure) needs to be fixed.
For example, given this playbook:
.. code-block:: yaml
- name: A play
tasks:
- name: A task
debug:
msg: foobar
Here's the first and last yielded items:
.. code-block:: python
>>> playbook=[{"name": "a play", "tasks": [{"name": "a task", "debug": {"msg": "foobar"}}]}]
>>> next( nested_items_path( playbook ) )
(0, {'name': 'a play', 'tasks': [{'name': 'a task', 'debug': {'msg': 'foobar'}}]}, [])
>>> list( nested_items_path( playbook ) )[-1]
('msg', 'foobar', [0, 'tasks', 0, 'debug'])
Note that, for outermost data structure, the parent path is ``[]`` because
you do not need to descend into any nested dicts or lists to find the indicated
key and value.
If a rule were designed to prohibit "foobar" debug messages, it could use the
parent path to provide a path to the problematic ``msg``. It might use a jq-style
path in its error message: "the error is at ``.[0].tasks[0].debug.msg``".
Or if a utility could automatically fix issues, it could use the path to descend
to the parent object using something like this:
.. code-block:: python
target = data
for segment in parent_path:
target = target[segment]
:param data_collection: The nested data (dicts or lists).
:returns: each iteration yields the key (of the parent dict) or the index (lists)
"""
# As typing and mypy cannot effectively ensure we are called only with
# valid data, we better ignore NoneType
if data_collection is None:
return
yield from _nested_items_path(
data_collection=data_collection, parent_path=[], ignored_keys=ignored_keys
)
def _nested_items_path(
data_collection: dict[Any, Any] | list[Any],
parent_path: list[str | int],
ignored_keys: Sequence[str] = (),
) -> Iterator[tuple[Any, Any, list[str | int]]]:
"""Iterate through data_collection (internal implementation of nested_items_path).
This is a separate function because callers of nested_items_path should
not be using the parent_path param which is used in recursive _nested_items_path
calls to build up the path to the parent object of the current key/index, value.
"""
# we have to cast each convert_to_tuples assignment or mypy complains
# that both assignments (for dict and list) do not have the same type
convert_to_tuples_type = Callable[[], Iterator[Tuple[Union[str, int], Any]]]
if isinstance(data_collection, dict):
convert_data_collection_to_tuples = cast(
convert_to_tuples_type, functools.partial(data_collection.items)
)
elif isinstance(data_collection, list):
convert_data_collection_to_tuples = cast(
convert_to_tuples_type, functools.partial(enumerate, data_collection)
)
else:
raise TypeError(
f"Expected a dict or a list but got {data_collection!r} "
f"of type '{type(data_collection)}'"
)
for key, value in convert_data_collection_to_tuples():
if key in (SKIPPED_RULES_KEY, "__file__", "__line__", *ignored_keys):
continue
yield key, value, parent_path
if isinstance(value, (dict, list)):
yield from _nested_items_path(
data_collection=value, parent_path=parent_path + [key]
)
def get_path_to_play(
lintable: Lintable,
line_number: int, # 1-based
ruamel_data: CommentedMap | CommentedSeq,
) -> list[str | int]:
"""Get the path to the play in the given file at the given line number."""
if line_number < 1:
raise ValueError(f"expected line_number >= 1, got {line_number}")
if lintable.kind != "playbook" or not isinstance(ruamel_data, CommentedSeq):
return []
lc: LineCol # lc uses 0-based counts # pylint: disable=invalid-name
# line_number is 1-based. Convert to 0-based.
line_index = line_number - 1
prev_play_line_index = ruamel_data.lc.line
last_play_index = len(ruamel_data)
for play_index, play in enumerate(ruamel_data):
next_play_index = play_index + 1
if last_play_index > next_play_index:
next_play_line_index = ruamel_data[next_play_index].lc.line
else:
next_play_line_index = None
lc = play.lc # pylint: disable=invalid-name
assert isinstance(lc.line, int)
if lc.line == line_index:
return [play_index]
if play_index > 0 and prev_play_line_index < line_index < lc.line:
return [play_index - 1]
# The previous play check (above) can't catch the last play,
# so, handle the last play separately.
if (
next_play_index == last_play_index
and line_index > lc.line
and (next_play_line_index is None or line_index < next_play_line_index)
):
# part of this (last) play
return [play_index]
prev_play_line_index = play.lc.line
return []
def get_path_to_task(
lintable: Lintable,
line_number: int, # 1-based
ruamel_data: CommentedMap | CommentedSeq,
) -> list[str | int]:
"""Get the path to the task in the given file at the given line number."""
if line_number < 1:
raise ValueError(f"expected line_number >= 1, got {line_number}")
if lintable.kind in ("tasks", "handlers"):
assert isinstance(ruamel_data, CommentedSeq)
return _get_path_to_task_in_tasks_block(line_number, ruamel_data)
if lintable.kind == "playbook":
assert isinstance(ruamel_data, CommentedSeq)
return _get_path_to_task_in_playbook(line_number, ruamel_data)
# if lintable.kind in ["yaml", "requirements", "vars", "meta", "reno", "test-meta"]:
return []
def _get_path_to_task_in_playbook(
line_number: int, # 1-based
ruamel_data: CommentedSeq,
) -> list[str | int]:
"""Get the path to the task in the given playbook data at the given line number."""
last_play_index = len(ruamel_data)
for play_index, play in enumerate(ruamel_data):
next_play_index = play_index + 1
if last_play_index > next_play_index:
next_play_line_index = ruamel_data[next_play_index].lc.line
else:
next_play_line_index = None
play_keys = list(play.keys())
for tasks_keyword in PLAYBOOK_TASK_KEYWORDS:
if not play.get(tasks_keyword):
continue
try:
next_keyword = play_keys[play_keys.index(tasks_keyword) + 1]
except IndexError:
next_block_line_index = None
else:
next_block_line_index = play.lc.data[next_keyword][0]
# last_line_number_in_block is 1-based; next_*_line_index is 0-based
# next_*_line_index - 1 to get line before next_*_line_index.
# Then + 1 to make it a 1-based number.
# So, last_line_number_in_block = next_*_line_index - 1 + 1
if next_block_line_index is not None:
last_line_number_in_block = next_block_line_index
elif next_play_line_index is not None:
last_line_number_in_block = next_play_line_index
else:
last_line_number_in_block = None
task_path = _get_path_to_task_in_tasks_block(
line_number, play[tasks_keyword], last_line_number_in_block
)
if task_path:
# mypy gets confused without this typehint
tasks_keyword_path: list[int | str] = [
play_index,
tasks_keyword,
]
return tasks_keyword_path + list(task_path)
# line_number is before first play or no tasks keywords in any of the plays
return []
def _get_path_to_task_in_tasks_block(
line_number: int, # 1-based
tasks_block: CommentedSeq,
last_line_number: int | None = None, # 1-based
) -> list[str | int]:
"""Get the path to the task in the given tasks block at the given line number."""
task: CommentedMap | None
# line_number and last_line_number are 1-based. Convert to 0-based.
line_index = line_number - 1
last_line_index = None if last_line_number is None else last_line_number - 1
# lc (LineCol) uses 0-based counts
prev_task_line_index = tasks_block.lc.line
last_task_index = len(tasks_block)
for task_index, task in enumerate(tasks_block):
next_task_index = task_index + 1
if last_task_index > next_task_index:
if tasks_block[next_task_index] is not None:
next_task_line_index = tasks_block[next_task_index].lc.line
else:
next_task_line_index = tasks_block.lc.item(next_task_index)[0]
else:
next_task_line_index = None
if task is None:
# create a dummy task to represent the null task
task = CommentedMap()
task.lc.line, task.lc.col = tasks_block.lc.item(task_index)
nested_task_keys = set(task.keys()).intersection(set(NESTED_TASK_KEYS))
if nested_task_keys:
subtask_path = _get_path_to_task_in_nested_tasks_block(
line_number, task, nested_task_keys, next_task_line_index
)
if subtask_path:
# mypy gets confused without this typehint
task_path: list[str | int] = [task_index]
return task_path + list(subtask_path)
assert isinstance(task.lc.line, int)
if task.lc.line == line_index:
return [task_index]
if task_index > 0 and prev_task_line_index < line_index < task.lc.line:
return [task_index - 1]
# The previous task check can't catch the last task,
# so, handle the last task separately (also after subtask checks).
# pylint: disable=too-many-boolean-expressions
if (
next_task_index == last_task_index
and line_index > task.lc.line
and (next_task_line_index is None or line_index < next_task_line_index)
and (last_line_index is None or line_index <= last_line_index)
):
# part of this (last) task
return [task_index]
prev_task_line_index = task.lc.line
# line is not part of this tasks block
return []
def _get_path_to_task_in_nested_tasks_block(
line_number: int, # 1-based
task: CommentedMap,
nested_task_keys: set[str],
next_task_line_index: int | None = None, # 0-based
) -> list[str | int]:
"""Get the path to the task in the given nested tasks block."""
# loop through the keys in line order
task_keys = list(task.keys())
task_keys_by_index = dict(enumerate(task_keys))
for task_index, task_key in enumerate(task_keys):
nested_task_block = task[task_key]
if task_key not in nested_task_keys or not nested_task_block:
continue
next_task_key = task_keys_by_index.get(task_index + 1, None)
if next_task_key is not None:
next_task_key_line_index = task.lc.data[next_task_key][0]
else:
next_task_key_line_index = None
# last_line_number_in_block is 1-based; next_*_line_index is 0-based
# next_*_line_index - 1 to get line before next_*_line_index.
# Then + 1 to make it a 1-based number.
# So, last_line_number_in_block = next_*_line_index - 1 + 1
last_line_number_in_block = (
next_task_key_line_index
if next_task_key_line_index is not None
else next_task_line_index
)
subtask_path = _get_path_to_task_in_tasks_block(
line_number,
nested_task_block,
last_line_number_in_block, # 1-based
)
if subtask_path:
return [task_key] + list(subtask_path)
# line is not part of this nested tasks block
return []
class OctalIntYAML11(ScalarInt):
"""OctalInt representation for YAML 1.1."""
# tell mypy that ScalarInt has these attributes
_width: Any
_underscore: Any
def __new__(cls, *args: Any, **kwargs: Any) -> Any:
"""Create a new int with ScalarInt-defined attributes."""
return ScalarInt.__new__(cls, *args, **kwargs)
@staticmethod
def represent_octal(representer: RoundTripRepresenter, data: OctalIntYAML11) -> Any:
"""Return a YAML 1.1 octal representation.
Based on ruamel.yaml.representer.RoundTripRepresenter.represent_octal_int()
(which only handles the YAML 1.2 octal representation).
"""
v = format(data, "o")
anchor = data.yaml_anchor(any=True)
# noinspection PyProtectedMember
# pylint: disable=protected-access
return representer.insert_underscore("0", v, data._underscore, anchor=anchor)
class CustomConstructor(RoundTripConstructor):
"""Custom YAML constructor that preserves Octal formatting in YAML 1.1."""
def construct_yaml_int(self, node: ScalarNode) -> Any:
"""Construct int while preserving Octal formatting in YAML 1.1.
ruamel.yaml only preserves the octal format for YAML 1.2.
For 1.1, it converts the octal to an int. So, we preserve the format.
Code partially copied from ruamel.yaml (MIT licensed).
"""
ret = super().construct_yaml_int(node)
if self.resolver.processing_version == (1, 1) and isinstance(ret, int):
# Do not rewrite zero as octal.
if ret == 0:
return ret
# see if we've got an octal we need to preserve.
value_su = self.construct_scalar(node)
try:
v = value_su.rstrip("_")
underscore = [len(v) - v.rindex("_") - 1, False, False] # type: Any
except ValueError:
underscore = None
except IndexError:
underscore = None
value_s = value_su.replace("_", "")
if value_s[0] in "+-":
value_s = value_s[1:]
if value_s[0] == "0":
# got an octal in YAML 1.1
ret = OctalIntYAML11(
ret, width=None, underscore=underscore, anchor=node.anchor
)
return ret
CustomConstructor.add_constructor(
"tag:yaml.org,2002:int", CustomConstructor.construct_yaml_int
)
class FormattedEmitter(Emitter):
"""Emitter that applies custom formatting rules when dumping YAML.
Differences from ruamel.yaml defaults:
- indentation of root-level sequences
- prefer double-quoted scalars over single-quoted scalars
This ensures that root-level sequences are never indented.
All subsequent levels are indented as configured (normal ruamel.yaml behavior).
Earlier implementations used dedent on ruamel.yaml's dumped output,
but string magic like that had a ton of problematic edge cases.
"""
preferred_quote = '"' # either " or '
min_spaces_inside = 0
max_spaces_inside = 1
_sequence_indent = 2
_sequence_dash_offset = 0 # Should be _sequence_indent - 2
_root_is_sequence = False
_in_empty_flow_map = False
@property
def _is_root_level_sequence(self) -> bool:
"""Return True if this is a sequence at the root level of the yaml document."""
return self.column < 2 and self._root_is_sequence
def expect_document_root(self) -> None:
"""Expect doc root (extend to record if the root doc is a sequence)."""
self._root_is_sequence = isinstance(
self.event, ruamel.yaml.events.SequenceStartEvent
)
return super().expect_document_root()
# NB: mypy does not support overriding attributes with properties yet:
# https://github.com/python/mypy/issues/4125
# To silence we have to ignore[override] both the @property and the method.
@property
def best_sequence_indent(self) -> int:
"""Return the configured sequence_indent or 2 for root level."""
return 2 if self._is_root_level_sequence else self._sequence_indent
@best_sequence_indent.setter
def best_sequence_indent(self, value: int) -> None:
"""Configure how many columns to indent each sequence item (including the '-')."""
self._sequence_indent = value
@property
def sequence_dash_offset(self) -> int:
"""Return the configured sequence_dash_offset or 0 for root level."""
return 0 if self._is_root_level_sequence else self._sequence_dash_offset
@sequence_dash_offset.setter
def sequence_dash_offset(self, value: int) -> None:
"""Configure how many spaces to put before each sequence item's '-'."""
self._sequence_dash_offset = value
def choose_scalar_style(self) -> Any:
"""Select how to quote scalars if needed."""
style = super().choose_scalar_style()
if (
style == ""
and self.event.value.startswith("0")
and len(self.event.value) > 1
):
if self.event.tag == "tag:yaml.org,2002:int" and self.event.implicit[0]:
# ensures that "0123" string does not lose its quoting
self.event.tag = "tag:yaml.org,2002:str"
self.event.implicit = (True, True, True)
return '"'
if style != "'":
# block scalar, double quoted, etc.
return style
if '"' in self.event.value:
return "'"
return self.preferred_quote
def write_indicator(
self,
indicator: str, # ruamel.yaml typehint is wrong. This is a string.
need_whitespace: bool,
whitespace: bool = False,
indention: bool = False, # (sic) ruamel.yaml has this typo in their API
) -> None:
"""Make sure that flow maps get whitespace by the curly braces."""
# We try to go with one whitespace by the curly braces and adjust accordingly
# to what min_spaces_inside and max_spaces_inside are set to.
# This assumes min_spaces_inside <= max_spaces_inside
spaces_inside = min(
max(1, self.min_spaces_inside),
self.max_spaces_inside if self.max_spaces_inside != -1 else 1,
)
# If this is the end of the flow mapping that isn't on a new line:
if (
indicator == "}"
and (self.column or 0) > (self.indent or 0)
and not self._in_empty_flow_map
):
indicator = (" " * spaces_inside) + "}"
super().write_indicator(indicator, need_whitespace, whitespace, indention)
# if it is the start of a flow mapping, and it's not time
# to wrap the lines, insert a space.
if indicator == "{" and self.column < self.best_width:
if self.check_empty_mapping():
self._in_empty_flow_map = True
else:
self.column += 1
self.stream.write(" " * spaces_inside)
self._in_empty_flow_map = False
# "/n/n" results in one blank line (end the previous line, then newline).
# So, "/n/n/n" or more is too many new lines. Clean it up.
_re_repeat_blank_lines: Pattern[str] = re.compile(r"\n{3,}")
@staticmethod
def add_octothorpe_protection(string: str) -> str:
"""Modify strings to protect "#" from full-line-comment post-processing."""
try:
if "#" in string:
# # is \uFF03 (fullwidth number sign)
# ﹟ is \uFE5F (small number sign)
string = string.replace("#", "\uFF03#\uFE5F")
# this is safe even if this sequence is present
# because it gets reversed in post-processing
except (ValueError, TypeError):
# probably not really a string. Whatever.
pass
return string
@staticmethod
def drop_octothorpe_protection(string: str) -> str:
"""Remove string protection of "#" after full-line-comment post-processing."""
try:
if "\uFF03#\uFE5F" in string:
# # is \uFF03 (fullwidth number sign)
# ﹟ is \uFE5F (small number sign)
string = string.replace("\uFF03#\uFE5F", "#")
except (ValueError, TypeError):
# probably not really a string. Whatever.
pass
return string
def analyze_scalar(self, scalar: str) -> ScalarAnalysis:
"""Determine quoting and other requirements for string.
And protect "#" from full-line-comment post-processing.
"""
analysis: ScalarAnalysis = super().analyze_scalar(scalar)
if analysis.empty:
return analysis
analysis.scalar = self.add_octothorpe_protection(analysis.scalar)
return analysis
# comment is a CommentToken, not Any (Any is ruamel.yaml's lazy type hint).
def write_comment(self, comment: CommentToken, pre: bool = False) -> None:
"""Clean up extra new lines and spaces in comments.
ruamel.yaml treats new or empty lines as comments.
See: https://stackoverflow.com/questions/42708668/removing-all-blank-lines-but-not-comments-in-ruamel-yaml/42712747#42712747
"""
value: str = comment.value
if (
pre
and not value.strip()
and not isinstance(
self.event,
(
ruamel.yaml.events.CollectionEndEvent,
ruamel.yaml.events.DocumentEndEvent,
ruamel.yaml.events.StreamEndEvent,
),
)
):
# drop pure whitespace pre comments
# does not apply to End events since they consume one of the newlines.
value = ""
elif pre:
# preserve content in pre comment with at least one newline,
# but no extra blank lines.
value = self._re_repeat_blank_lines.sub("\n", value)
else:
# single blank lines in post comments
value = self._re_repeat_blank_lines.sub("\n\n", value)
comment.value = value
# make sure that the eol comment only has one space before it.
if comment.column > self.column + 1 and not pre:
comment.column = self.column + 1
return super().write_comment(comment, pre)
def write_version_directive(self, version_text: Any) -> None:
"""Skip writing '%YAML 1.1'."""
if version_text == "1.1":
return
super().write_version_directive(version_text)
# pylint: disable=too-many-instance-attributes
class FormattedYAML(YAML):
"""A YAML loader/dumper that handles ansible content better by default."""
def __init__(
self,
*,
typ: str | None = None,
pure: bool = False,
output: Any = None,
# input: Any = None,
plug_ins: list[str] | None = None,
):
"""Return a configured ``ruamel.yaml.YAML`` instance.
Some config defaults get extracted from the yamllint config.
``ruamel.yaml.YAML`` uses attributes to configure how it dumps yaml files.
Some of these settings can be confusing, so here are examples of how different
settings will affect the dumped yaml.
This example does not indent any sequences:
.. code:: python
yaml.explicit_start=True
yaml.map_indent=2
yaml.sequence_indent=2
yaml.sequence_dash_offset=0
.. code:: yaml
---
- name: A playbook
tasks:
- name: Task
This example indents all sequences including the root-level:
.. code:: python
yaml.explicit_start=True
yaml.map_indent=2
yaml.sequence_indent=4
yaml.sequence_dash_offset=2
# yaml.Emitter defaults to ruamel.yaml.emitter.Emitter
.. code:: yaml
---
- name: Playbook
tasks:
- name: Task
This example indents all sequences except at the root-level:
.. code:: python
yaml.explicit_start=True
yaml.map_indent=2
yaml.sequence_indent=4
yaml.sequence_dash_offset=2
yaml.Emitter = FormattedEmitter # custom Emitter prevents root-level indents
.. code:: yaml
---
- name: Playbook
tasks:
- name: Task
"""
# Default to reading/dumping YAML 1.1 (ruamel.yaml defaults to 1.2)
self._yaml_version_default: tuple[int, int] = (1, 1)
self._yaml_version: str | tuple[int, int] = self._yaml_version_default
super().__init__(typ=typ, pure=pure, output=output, plug_ins=plug_ins)
# NB: We ignore some mypy issues because ruamel.yaml typehints are not great.
config = self._defaults_from_yamllint_config()
# these settings are derived from yamllint config
self.explicit_start: bool = config["explicit_start"] # type: ignore[assignment]
self.explicit_end: bool = config["explicit_end"] # type: ignore[assignment]
self.width: int = config["width"] # type: ignore[assignment]
indent_sequences: bool = cast(bool, config["indent_sequences"])
preferred_quote: str = cast(str, config["preferred_quote"]) # either ' or "
min_spaces_inside: int = cast(int, config["min_spaces_inside"])
max_spaces_inside: int = cast(int, config["max_spaces_inside"])
self.default_flow_style = False
self.compact_seq_seq = True # type: ignore[assignment] # dash after dash
self.compact_seq_map = True # type: ignore[assignment] # key after dash
# Do not use yaml.indent() as it obscures the purpose of these vars:
self.map_indent = 2 # type: ignore[assignment]
self.sequence_indent = 4 if indent_sequences else 2 # type: ignore[assignment]
self.sequence_dash_offset = self.sequence_indent - 2 # type: ignore[operator]
# If someone doesn't want our FormattedEmitter, they can change it.
self.Emitter = FormattedEmitter
# ignore invalid preferred_quote setting
if preferred_quote in ['"', "'"]:
FormattedEmitter.preferred_quote = preferred_quote
# NB: default_style affects preferred_quote as well.
# self.default_style ∈ None (default), '', '"', "'", '|', '>'
# spaces inside braces for flow mappings
FormattedEmitter.min_spaces_inside = min_spaces_inside
FormattedEmitter.max_spaces_inside = max_spaces_inside
# We need a custom constructor to preserve Octal formatting in YAML 1.1
self.Constructor = CustomConstructor
self.Representer.add_representer(OctalIntYAML11, OctalIntYAML11.represent_octal)
# We should preserve_quotes loads all strings as a str subclass that carries
# a quote attribute. Will the str subclasses cause problems in transforms?
# Are there any other gotchas to this?
#
# This will only preserve quotes for strings read from the file.
# anything modified by the transform will use no quotes, preferred_quote,
# or the quote that results in the least amount of escaping.
# self.preserve_quotes = True
# If needed, we can use this to change null representation to be explicit
# (see https://stackoverflow.com/a/44314840/1134951)
# self.Representer.add_representer(
# type(None),
# lambda self, data: self.represent_scalar("tag:yaml.org,2002:null", "null"),
# )
@staticmethod
def _defaults_from_yamllint_config() -> dict[str, bool | int | str]:
"""Extract FormattedYAML-relevant settings from yamllint config if possible."""
config = {
"explicit_start": True,
"explicit_end": False,
"width": 160,
"indent_sequences": True,
"preferred_quote": '"',
"min_spaces_inside": 0,
"max_spaces_inside": 1,
}
for rule, rule_config in load_yamllint_config().rules.items():
if not rule_config:
# rule disabled
continue
# refactor this if ... elif ... elif ... else monstrosity using match/case (PEP 634) once python 3.10 is mandatory
if rule == "document-start":
config["explicit_start"] = rule_config["present"]
elif rule == "document-end":
config["explicit_end"] = rule_config["present"]
elif rule == "line-length":
config["width"] = rule_config["max"]
elif rule == "braces":
min_spaces_inside = rule_config["min-spaces-inside"]
if min_spaces_inside:
config["min_spaces_inside"] = int(min_spaces_inside)
max_spaces_inside = rule_config["max-spaces-inside"]
if max_spaces_inside:
config["max_spaces_inside"] = int(max_spaces_inside)
elif rule == "indentation":
indent_sequences = rule_config["indent-sequences"]
# one of: bool, "whatever", "consistent"
# so, we use True for "whatever" and "consistent"
config["indent_sequences"] = bool(indent_sequences)
elif rule == "quoted-strings":
quote_type = rule_config["quote-type"]
# one of: single, double, any
if quote_type == "single":
config["preferred_quote"] = "'"
elif quote_type == "double":
config["preferred_quote"] = '"'
return cast(Dict[str, Union[bool, int, str]], config)
@property # type: ignore[override]
def version(self) -> str | tuple[int, int]:
"""Return the YAML version used to parse or dump.
Ansible uses PyYAML which only supports YAML 1.1. ruamel.yaml defaults to 1.2.
So, we have to make sure we dump yaml files using YAML 1.1.
We can relax the version requirement once ansible uses a version of PyYAML
that includes this PR: https://github.com/yaml/pyyaml/pull/555
"""
return self._yaml_version
@version.setter
def version(self, value: str | tuple[int, int] | None) -> None:
"""Ensure that yaml version uses our default value.
The yaml Reader updates this value based on the ``%YAML`` directive in files.
So, if a file does not include the directive, it sets this to None.
But, None effectively resets the parsing version to YAML 1.2 (ruamel's default).
"""
self._yaml_version = value if value is not None else self._yaml_version_default
def loads(self, stream: str) -> Any:
"""Load YAML content from a string while avoiding known ruamel.yaml issues."""
if not isinstance(stream, str):
raise NotImplementedError(f"expected a str but got {type(stream)}")
text, preamble_comment = self._pre_process_yaml(stream)
data = self.load(stream=text)
if preamble_comment is not None:
setattr(data, "preamble_comment", preamble_comment)
return data
def dumps(self, data: Any) -> str:
"""Dump YAML document to string (including its preamble_comment)."""
preamble_comment: str | None = getattr(data, "preamble_comment", None)
self._prevent_wrapping_flow_style(data)
with StringIO() as stream:
if preamble_comment:
stream.write(preamble_comment)
self.dump(data, stream)
text = stream.getvalue()
return self._post_process_yaml(text)
def _prevent_wrapping_flow_style(self, data: Any) -> None:
if not isinstance(data, (CommentedMap, CommentedSeq)):
return
for key, value, parent_path in nested_items_path(data):
if not isinstance(value, (CommentedMap, CommentedSeq)):
continue
fa: Format = value.fa # pylint: disable=invalid-name
if fa.flow_style():
predicted_indent = self._predict_indent_length(parent_path, key)
predicted_width = len(str(value))
if predicted_indent + predicted_width > self.width:
# this flow-style map will probably get line-wrapped,
# so, switch it to block style to avoid the line wrap.
fa.set_block_style()
def _predict_indent_length(self, parent_path: list[str | int], key: Any) -> int:
indent = 0
# each parent_key type tells us what the indent is for the next level.
for parent_key in parent_path:
if isinstance(parent_key, int) and indent == 0:
# root level is a sequence
indent += self.sequence_dash_offset
elif isinstance(parent_key, int):
# next level is a sequence
indent += cast(int, self.sequence_indent)
elif isinstance(parent_key, str):
# next level is a map
indent += cast(int, self.map_indent)
if isinstance(key, int) and indent == 0:
# flow map is an item in a root-level sequence
indent += self.sequence_dash_offset
elif isinstance(key, int) and indent > 0:
# flow map is in a sequence
indent += cast(int, self.sequence_indent)
elif isinstance(key, str):
# flow map is in a map
indent += len(key + ": ")
return indent
# ruamel.yaml only preserves empty (no whitespace) blank lines
# (ie "/n/n" becomes "/n/n" but "/n /n" becomes "/n").
# So, we need to identify whitespace-only lines to drop spaces before reading.
_whitespace_only_lines_re = re.compile(r"^ +$", re.MULTILINE)
def _pre_process_yaml(self, text: str) -> tuple[str, str | None]:
"""Handle known issues with ruamel.yaml loading.
Preserve blank lines despite extra whitespace.
Preserve any preamble (aka header) comments before "---".
For more on preamble comments, see: https://stackoverflow.com/questions/70286108/python-ruamel-yaml-package-how-to-get-header-comment-lines/70287507#70287507
"""
text = self._whitespace_only_lines_re.sub("", text)
# I investigated extending ruamel.yaml to capture preamble comments.
# preamble comment goes from:
# DocumentStartToken.comment -> DocumentStartEvent.comment
# Then, in the composer:
# once in composer.current_event
# composer.compose_document()
# discards DocumentStartEvent
# move DocumentStartEvent to composer.last_event
# node = composer.compose_node(None, None)
# all document nodes get composed (events get used)
# discard DocumentEndEvent
# move DocumentEndEvent to composer.last_event
# return node
# So, there's no convenient way to extend the composer
# to somehow capture the comments and pass them on.
preamble_comments = []
if "\n---\n" not in text and "\n--- " not in text:
# nothing is before the document start mark,
# so there are no comments to preserve.
return text, None
for line in text.splitlines(True):
# We only need to capture the preamble comments. No need to remove them.
# lines might also include directives.
if line.lstrip().startswith("#") or line == "\n":
preamble_comments.append(line)
elif line.startswith("---"):
break
return text, "".join(preamble_comments) or None
@staticmethod
def _post_process_yaml(text: str) -> str:
"""Handle known issues with ruamel.yaml dumping.
Make sure there's only one newline at the end of the file.
Fix the indent of full-line comments to match the indent of the next line.
See: https://stackoverflow.com/questions/71354698/how-can-i-use-the-ruamel-yaml-rtsc-mode/71355688#71355688
Also, removes "#" protection from strings that prevents them from being
identified as full line comments in post-processing.
Make sure null list items don't end in a space.
"""
text = text.rstrip("\n") + "\n"
lines = text.splitlines(keepends=True)
full_line_comments: list[tuple[int, str]] = []
for i, line in enumerate(lines):
stripped = line.lstrip()
if not stripped:
# blank line. Move on.
continue
space_length = len(line) - len(stripped)
if stripped.startswith("#"):
# got a full line comment
# allow some full line comments to match the previous indent
if i > 0 and not full_line_comments and space_length:
prev = lines[i - 1]
prev_space_length = len(prev) - len(prev.lstrip())
if prev_space_length == space_length:
# if the indent matches the previous line's indent, skip it.
continue
full_line_comments.append((i, stripped))
elif full_line_comments:
# end of full line comments so adjust to match indent of this line
spaces = " " * space_length
for index, comment in full_line_comments:
lines[index] = spaces + comment
full_line_comments.clear()
cleaned = line.strip()
if not cleaned.startswith("#") and cleaned.endswith("-"):
# got an empty list item. drop any trailing spaces.
lines[i] = line.rstrip() + "\n"
text = "".join(
FormattedEmitter.drop_octothorpe_protection(line) for line in lines
)
return text
def clean_json(
obj: Any,
func: Callable[[str], Any] = lambda key: key.startswith("__")
if isinstance(key, str)
else False,
) -> Any:
"""
Remove all keys matching the condition from a nested JSON-like object.
:param obj: a JSON like object to clean, also returned for chaining.
:param func: a callable that takes a key in argument and return True for each key to delete
"""
if isinstance(obj, dict):
for key in list(obj.keys()):
if func(key):
del obj[key]
else:
clean_json(obj[key], func)
elif isinstance(obj, list):
for i in reversed(range(len(obj))):
if func(obj[i]):
del obj[i]
else:
clean_json(obj[i], func)
else:
# neither a dict nor a list, do nothing
pass
return obj
|