diff options
Diffstat (limited to 'lib/ansiblelint/skip_utils.py')
-rw-r--r-- | lib/ansiblelint/skip_utils.py | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/lib/ansiblelint/skip_utils.py b/lib/ansiblelint/skip_utils.py new file mode 100644 index 0000000..c3c0a88 --- /dev/null +++ b/lib/ansiblelint/skip_utils.py @@ -0,0 +1,189 @@ +# (c) 2019–2020, Ansible by Red Hat +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +"""Utils related to inline skipping of rules.""" +import logging +from functools import lru_cache +from itertools import product +from typing import Any, Generator, List, Sequence + +import ruamel.yaml + +from ansiblelint.constants import FileType + +INLINE_SKIP_FLAG = '# noqa ' + +_logger = logging.getLogger(__name__) + + +# playbook: Sequence currently expects only instances of one of the two +# classes below but we should consider avoiding this chimera. +# ruamel.yaml.comments.CommentedSeq +# ansible.parsing.yaml.objects.AnsibleSequence + + +def get_rule_skips_from_line(line: str) -> List: + """Return list of rule ids skipped via comment on the line of yaml.""" + _before_noqa, _noqa_marker, noqa_text = line.partition(INLINE_SKIP_FLAG) + return noqa_text.split() + + +def append_skipped_rules(pyyaml_data: str, file_text: str, file_type: FileType) -> Sequence: + """Append 'skipped_rules' to individual tasks or single metadata block. + + For a file, uses 2nd parser (ruamel.yaml) to pull comments out of + yaml subsets, check for '# noqa' skipped rules, and append any skips to the + original parser (pyyaml) data relied on by remainder of ansible-lint. + + :param pyyaml_data: file text parsed via ansible and pyyaml. + :param file_text: raw file text. + :param file_type: type of file: tasks, handlers or meta. + :returns: original pyyaml_data altered with a 'skipped_rules' list added + to individual tasks, or added to the single metadata block. + """ + try: + yaml_skip = _append_skipped_rules(pyyaml_data, file_text, file_type) + except RuntimeError: + # Notify user of skip error, do not stop, do not change exit code + _logger.error('Error trying to append skipped rules', exc_info=True) + return pyyaml_data + return yaml_skip + + +@lru_cache(maxsize=128) +def load_data(file_text: str) -> Any: + """Parse `file_text` as yaml and return parsed structure. + + This is the main culprit for slow performance, each rule asks for loading yaml again and again + ideally the `maxsize` on the decorator above MUST be great or equal total number of rules + :param file_text: raw text to parse + :return: Parsed yaml + """ + yaml = ruamel.yaml.YAML() + return yaml.load(file_text) + + +def _append_skipped_rules(pyyaml_data: Sequence, file_text: str, file_type: FileType) -> Sequence: + # parse file text using 2nd parser library + ruamel_data = load_data(file_text) + + if file_type == 'meta': + pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data) + return pyyaml_data + + # create list of blocks of tasks or nested tasks + if file_type in ('tasks', 'handlers'): + ruamel_task_blocks = ruamel_data + pyyaml_task_blocks = pyyaml_data + elif file_type in ('playbook', 'pre_tasks', 'post_tasks'): + try: + pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data) + ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data) + except (AttributeError, TypeError): + # TODO(awcrosby): running ansible-lint on any .yml file will + # assume it is a playbook, check needs to be added higher in the + # call stack, and can remove this except + return pyyaml_data + else: + raise RuntimeError('Unexpected file type: {}'.format(file_type)) + + # get tasks from blocks of tasks + pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks) + ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks) + + # append skipped_rules for each task + for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks): + + # ignore empty tasks + if not pyyaml_task and not ruamel_task: + continue + + if pyyaml_task.get('name') != ruamel_task.get('name'): + raise RuntimeError('Error in matching skip comment to a task') + pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task) + + return pyyaml_data + + +def _get_task_blocks_from_playbook(playbook: Sequence) -> List: + """Return parts of playbook that contains tasks, and nested tasks. + + :param playbook: playbook yaml from yaml parser. + :returns: list of task dictionaries. + """ + PLAYBOOK_TASK_KEYWORDS = [ + 'tasks', + 'pre_tasks', + 'post_tasks', + 'handlers', + ] + + task_blocks = [] + for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS): + task_blocks.extend(play.get(key, [])) + return task_blocks + + +def _get_tasks_from_blocks(task_blocks: Sequence) -> Generator: + """Get list of tasks from list made of tasks and nested tasks.""" + NESTED_TASK_KEYS = [ + 'block', + 'always', + 'rescue', + ] + + def get_nested_tasks(task: Any) -> Generator[Any, None, None]: + return ( + subtask + for k in NESTED_TASK_KEYS if task and k in task + for subtask in task[k] + ) + + for task in task_blocks: + for sub_task in get_nested_tasks(task): + yield sub_task + yield task + + +def _get_rule_skips_from_yaml(yaml_input: Sequence) -> Sequence: + """Traverse yaml for comments with rule skips and return list of rules.""" + yaml_comment_obj_strs = [] + + def traverse_yaml(obj: Any) -> None: + yaml_comment_obj_strs.append(str(obj.ca.items)) + if isinstance(obj, dict): + for key, val in obj.items(): + if isinstance(val, (dict, list)): + traverse_yaml(val) + elif isinstance(obj, list): + for e in obj: + if isinstance(e, (dict, list)): + traverse_yaml(e) + else: + return + + traverse_yaml(yaml_input) + + rule_id_list = [] + for comment_obj_str in yaml_comment_obj_strs: + for line in comment_obj_str.split(r'\n'): + rule_id_list.extend(get_rule_skips_from_line(line)) + + return rule_id_list |