summaryrefslogtreecommitdiffstats
path: root/lib/ansiblelint/skip_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansiblelint/skip_utils.py')
-rw-r--r--lib/ansiblelint/skip_utils.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/lib/ansiblelint/skip_utils.py b/lib/ansiblelint/skip_utils.py
new file mode 100644
index 0000000..c3c0a88
--- /dev/null
+++ b/lib/ansiblelint/skip_utils.py
@@ -0,0 +1,189 @@
+# (c) 2019–2020, Ansible by Red Hat
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+"""Utils related to inline skipping of rules."""
+import logging
+from functools import lru_cache
+from itertools import product
+from typing import Any, Generator, List, Sequence
+
+import ruamel.yaml
+
+from ansiblelint.constants import FileType
+
+INLINE_SKIP_FLAG = '# noqa '
+
+_logger = logging.getLogger(__name__)
+
+
+# playbook: Sequence currently expects only instances of one of the two
+# classes below but we should consider avoiding this chimera.
+# ruamel.yaml.comments.CommentedSeq
+# ansible.parsing.yaml.objects.AnsibleSequence
+
+
+def get_rule_skips_from_line(line: str) -> List:
+ """Return list of rule ids skipped via comment on the line of yaml."""
+ _before_noqa, _noqa_marker, noqa_text = line.partition(INLINE_SKIP_FLAG)
+ return noqa_text.split()
+
+
+def append_skipped_rules(pyyaml_data: str, file_text: str, file_type: FileType) -> Sequence:
+ """Append 'skipped_rules' to individual tasks or single metadata block.
+
+ For a file, uses 2nd parser (ruamel.yaml) to pull comments out of
+ yaml subsets, check for '# noqa' skipped rules, and append any skips to the
+ original parser (pyyaml) data relied on by remainder of ansible-lint.
+
+ :param pyyaml_data: file text parsed via ansible and pyyaml.
+ :param file_text: raw file text.
+ :param file_type: type of file: tasks, handlers or meta.
+ :returns: original pyyaml_data altered with a 'skipped_rules' list added
+ to individual tasks, or added to the single metadata block.
+ """
+ try:
+ yaml_skip = _append_skipped_rules(pyyaml_data, file_text, file_type)
+ except RuntimeError:
+ # Notify user of skip error, do not stop, do not change exit code
+ _logger.error('Error trying to append skipped rules', exc_info=True)
+ return pyyaml_data
+ return yaml_skip
+
+
+@lru_cache(maxsize=128)
+def load_data(file_text: str) -> Any:
+ """Parse `file_text` as yaml and return parsed structure.
+
+ This is the main culprit for slow performance, each rule asks for loading yaml again and again
+ ideally the `maxsize` on the decorator above MUST be great or equal total number of rules
+ :param file_text: raw text to parse
+ :return: Parsed yaml
+ """
+ yaml = ruamel.yaml.YAML()
+ return yaml.load(file_text)
+
+
+def _append_skipped_rules(pyyaml_data: Sequence, file_text: str, file_type: FileType) -> Sequence:
+ # parse file text using 2nd parser library
+ ruamel_data = load_data(file_text)
+
+ if file_type == 'meta':
+ pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data)
+ return pyyaml_data
+
+ # create list of blocks of tasks or nested tasks
+ if file_type in ('tasks', 'handlers'):
+ ruamel_task_blocks = ruamel_data
+ pyyaml_task_blocks = pyyaml_data
+ elif file_type in ('playbook', 'pre_tasks', 'post_tasks'):
+ try:
+ pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data)
+ ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data)
+ except (AttributeError, TypeError):
+ # TODO(awcrosby): running ansible-lint on any .yml file will
+ # assume it is a playbook, check needs to be added higher in the
+ # call stack, and can remove this except
+ return pyyaml_data
+ else:
+ raise RuntimeError('Unexpected file type: {}'.format(file_type))
+
+ # get tasks from blocks of tasks
+ pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks)
+ ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks)
+
+ # append skipped_rules for each task
+ for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks):
+
+ # ignore empty tasks
+ if not pyyaml_task and not ruamel_task:
+ continue
+
+ if pyyaml_task.get('name') != ruamel_task.get('name'):
+ raise RuntimeError('Error in matching skip comment to a task')
+ pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task)
+
+ return pyyaml_data
+
+
+def _get_task_blocks_from_playbook(playbook: Sequence) -> List:
+ """Return parts of playbook that contains tasks, and nested tasks.
+
+ :param playbook: playbook yaml from yaml parser.
+ :returns: list of task dictionaries.
+ """
+ PLAYBOOK_TASK_KEYWORDS = [
+ 'tasks',
+ 'pre_tasks',
+ 'post_tasks',
+ 'handlers',
+ ]
+
+ task_blocks = []
+ for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS):
+ task_blocks.extend(play.get(key, []))
+ return task_blocks
+
+
+def _get_tasks_from_blocks(task_blocks: Sequence) -> Generator:
+ """Get list of tasks from list made of tasks and nested tasks."""
+ NESTED_TASK_KEYS = [
+ 'block',
+ 'always',
+ 'rescue',
+ ]
+
+ def get_nested_tasks(task: Any) -> Generator[Any, None, None]:
+ return (
+ subtask
+ for k in NESTED_TASK_KEYS if task and k in task
+ for subtask in task[k]
+ )
+
+ for task in task_blocks:
+ for sub_task in get_nested_tasks(task):
+ yield sub_task
+ yield task
+
+
+def _get_rule_skips_from_yaml(yaml_input: Sequence) -> Sequence:
+ """Traverse yaml for comments with rule skips and return list of rules."""
+ yaml_comment_obj_strs = []
+
+ def traverse_yaml(obj: Any) -> None:
+ yaml_comment_obj_strs.append(str(obj.ca.items))
+ if isinstance(obj, dict):
+ for key, val in obj.items():
+ if isinstance(val, (dict, list)):
+ traverse_yaml(val)
+ elif isinstance(obj, list):
+ for e in obj:
+ if isinstance(e, (dict, list)):
+ traverse_yaml(e)
+ else:
+ return
+
+ traverse_yaml(yaml_input)
+
+ rule_id_list = []
+ for comment_obj_str in yaml_comment_obj_strs:
+ for line in comment_obj_str.split(r'\n'):
+ rule_id_list.extend(get_rule_skips_from_line(line))
+
+ return rule_id_list