summaryrefslogtreecommitdiffstats
path: root/lib/ansiblelint/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansiblelint/utils.py')
-rw-r--r--lib/ansiblelint/utils.py836
1 files changed, 836 insertions, 0 deletions
diff --git a/lib/ansiblelint/utils.py b/lib/ansiblelint/utils.py
new file mode 100644
index 0000000..feac4d7
--- /dev/null
+++ b/lib/ansiblelint/utils.py
@@ -0,0 +1,836 @@
+# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+"""Generic utility helpers."""
+
+import contextlib
+import inspect
+import logging
+import os
+import pprint
+import subprocess
+from argparse import Namespace
+from collections import OrderedDict
+from functools import lru_cache
+from pathlib import Path
+from typing import Callable, ItemsView, List, Optional, Tuple
+
+import yaml
+from ansible import constants
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.parsing.dataloader import DataLoader
+from ansible.parsing.mod_args import ModuleArgsParser
+from ansible.parsing.splitter import split_args
+from ansible.parsing.yaml.constructor import AnsibleConstructor
+from ansible.parsing.yaml.loader import AnsibleLoader
+from ansible.parsing.yaml.objects import AnsibleSequence
+from ansible.plugins.loader import add_all_plugin_dirs
+from ansible.template import Templar
+from yaml.composer import Composer
+from yaml.representer import RepresenterError
+
+from ansiblelint.constants import (
+ ANSIBLE_FAILURE_RC, CUSTOM_RULESDIR_ENVVAR, DEFAULT_RULESDIR, FileType,
+)
+from ansiblelint.errors import MatchError
+from ansiblelint.file_utils import normpath
+
+# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a
+# string as the password to enable such yaml files to be opened and parsed
+# successfully.
+DEFAULT_VAULT_PASSWORD = 'x'
+
+PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None)
+
+
+_logger = logging.getLogger(__name__)
+
+
+def parse_yaml_from_file(filepath: str) -> dict:
+ dl = DataLoader()
+ if hasattr(dl, 'set_vault_password'):
+ dl.set_vault_password(DEFAULT_VAULT_PASSWORD)
+ return dl.load_from_file(filepath)
+
+
+def path_dwim(basedir: str, given: str) -> str:
+ dl = DataLoader()
+ dl.set_basedir(basedir)
+ return dl.path_dwim(given)
+
+
+def ansible_template(basedir, varname, templatevars, **kwargs):
+ dl = DataLoader()
+ dl.set_basedir(basedir)
+ templar = Templar(dl, variables=templatevars)
+ return templar.template(varname, **kwargs)
+
+
+LINE_NUMBER_KEY = '__line__'
+FILENAME_KEY = '__file__'
+
+VALID_KEYS = [
+ 'name', 'action', 'when', 'async', 'poll', 'notify',
+ 'first_available_file', 'include', 'include_tasks', 'import_tasks', 'import_playbook',
+ 'tags', 'register', 'ignore_errors', 'delegate_to',
+ 'local_action', 'transport', 'remote_user', 'sudo',
+ 'sudo_user', 'sudo_pass', 'when', 'connection', 'environment', 'args', 'always_run',
+ 'any_errors_fatal', 'changed_when', 'failed_when', 'check_mode', 'delay',
+ 'retries', 'until', 'su', 'su_user', 'su_pass', 'no_log', 'run_once',
+ 'become', 'become_user', 'become_method', FILENAME_KEY,
+]
+
+BLOCK_NAME_TO_ACTION_TYPE_MAP = {
+ 'tasks': 'task',
+ 'handlers': 'handler',
+ 'pre_tasks': 'task',
+ 'post_tasks': 'task',
+ 'block': 'meta',
+ 'rescue': 'meta',
+ 'always': 'meta',
+}
+
+
+def tokenize(line):
+ tokens = line.lstrip().split(" ")
+ if tokens[0] == '-':
+ tokens = tokens[1:]
+ if tokens[0] == 'action:' or tokens[0] == 'local_action:':
+ tokens = tokens[1:]
+ command = tokens[0].replace(":", "")
+
+ args = list()
+ kwargs = dict()
+ nonkvfound = False
+ for arg in tokens[1:]:
+ if "=" in arg and not nonkvfound:
+ kv = arg.split("=", 1)
+ kwargs[kv[0]] = kv[1]
+ else:
+ nonkvfound = True
+ args.append(arg)
+ return (command, args, kwargs)
+
+
+def _playbook_items(pb_data: dict) -> ItemsView:
+ if isinstance(pb_data, dict):
+ return pb_data.items()
+ elif not pb_data:
+ return []
+ else:
+ return [item for play in pb_data for item in play.items()]
+
+
+def _rebind_match_filename(filename: str, func) -> Callable:
+ def func_wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except MatchError as e:
+ e.filename = filename
+ raise e
+ return func_wrapper
+
+
+def _set_collections_basedir(basedir: str):
+ # Sets the playbook directory as playbook_paths for the collection loader
+ try:
+ # Ansible 2.10+
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansible.utils.collection_loader import AnsibleCollectionConfig
+
+ AnsibleCollectionConfig.playbook_paths = basedir
+ except ImportError:
+ # Ansible 2.8 or 2.9
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansible.utils.collection_loader import set_collection_playbook_paths
+
+ set_collection_playbook_paths(basedir)
+
+
+def find_children(playbook: Tuple[str, str], playbook_dir: str) -> List:
+ if not os.path.exists(playbook[0]):
+ return []
+ _set_collections_basedir(playbook_dir or '.')
+ add_all_plugin_dirs(playbook_dir or '.')
+ if playbook[1] == 'role':
+ playbook_ds = {'roles': [{'role': playbook[0]}]}
+ else:
+ try:
+ playbook_ds = parse_yaml_from_file(playbook[0])
+ except AnsibleError as e:
+ raise SystemExit(str(e))
+ results = []
+ basedir = os.path.dirname(playbook[0])
+ items = _playbook_items(playbook_ds)
+ for item in items:
+ for child in _rebind_match_filename(playbook[0], play_children)(
+ basedir, item, playbook[1], playbook_dir):
+ if "$" in child['path'] or "{{" in child['path']:
+ continue
+ valid_tokens = list()
+ for token in split_args(child['path']):
+ if '=' in token:
+ break
+ valid_tokens.append(token)
+ path = ' '.join(valid_tokens)
+ results.append({
+ 'path': path_dwim(basedir, path),
+ 'type': child['type']
+ })
+ return results
+
+
+def template(basedir, value, vars, fail_on_undefined=False, **kwargs):
+ try:
+ value = ansible_template(os.path.abspath(basedir), value, vars,
+ **dict(kwargs, fail_on_undefined=fail_on_undefined))
+ # Hack to skip the following exception when using to_json filter on a variable.
+ # I guess the filter doesn't like empty vars...
+ except (AnsibleError, ValueError, RepresenterError):
+ # templating failed, so just keep value as is.
+ pass
+ return value
+
+
+def play_children(basedir, item, parent_type, playbook_dir):
+ delegate_map = {
+ 'tasks': _taskshandlers_children,
+ 'pre_tasks': _taskshandlers_children,
+ 'post_tasks': _taskshandlers_children,
+ 'block': _taskshandlers_children,
+ 'include': _include_children,
+ 'import_playbook': _include_children,
+ 'roles': _roles_children,
+ 'dependencies': _roles_children,
+ 'handlers': _taskshandlers_children,
+ 'include_tasks': _include_children,
+ 'import_tasks': _include_children,
+ }
+ (k, v) = item
+ add_all_plugin_dirs(os.path.abspath(basedir))
+
+ if k in delegate_map:
+ if v:
+ v = template(os.path.abspath(basedir),
+ v,
+ dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)),
+ fail_on_undefined=False)
+ return delegate_map[k](basedir, k, v, parent_type)
+ return []
+
+
+def _include_children(basedir, k, v, parent_type):
+ # handle special case include_tasks: name=filename.yml
+ if k == 'include_tasks' and isinstance(v, dict) and 'file' in v:
+ v = v['file']
+
+ # handle include: filename.yml tags=blah
+ (command, args, kwargs) = tokenize("{0}: {1}".format(k, v))
+
+ result = path_dwim(basedir, args[0])
+ if not os.path.exists(result):
+ result = path_dwim(os.path.join(os.path.dirname(basedir)), v)
+ return [{'path': result, 'type': parent_type}]
+
+
+def _taskshandlers_children(basedir, k, v, parent_type: FileType) -> List:
+ results = []
+ for th in v:
+
+ # ignore empty tasks, `-`
+ if not th:
+ continue
+
+ with contextlib.suppress(LookupError):
+ children = _get_task_handler_children_for_tasks_or_playbooks(
+ th, basedir, k, parent_type,
+ )
+ results.append(children)
+ continue
+
+ if 'include_role' in th or 'import_role' in th: # lgtm [py/unreachable-statement]
+ th = normalize_task_v2(th)
+ _validate_task_handler_action_for_role(th['action'])
+ results.extend(_roles_children(basedir, k, [th['action'].get("name")],
+ parent_type,
+ main=th['action'].get('tasks_from', 'main')))
+ continue
+
+ if 'block' not in th:
+ continue
+
+ results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type))
+ if 'rescue' in th:
+ results.extend(_taskshandlers_children(basedir, k, th['rescue'], parent_type))
+ if 'always' in th:
+ results.extend(_taskshandlers_children(basedir, k, th['always'], parent_type))
+
+ return results
+
+
+def _get_task_handler_children_for_tasks_or_playbooks(
+ task_handler, basedir: str, k, parent_type: FileType,
+) -> dict:
+ """Try to get children of taskhandler for include/import tasks/playbooks."""
+ child_type = k if parent_type == 'playbook' else parent_type
+
+ task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks'
+ for task_handler_key in task_include_keys:
+
+ with contextlib.suppress(KeyError):
+
+ # ignore empty tasks
+ if not task_handler:
+ continue
+
+ return {
+ 'path': path_dwim(basedir, task_handler[task_handler_key]),
+ 'type': child_type,
+ }
+
+ raise LookupError(
+ f'The node contains none of: {", ".join(task_include_keys)}',
+ )
+
+
+def _validate_task_handler_action_for_role(th_action: dict) -> None:
+ """Verify that the task handler action is valid for role include."""
+ module = th_action['__ansible_module__']
+
+ if 'name' not in th_action:
+ raise MatchError(f"Failed to find required 'name' key in {module!s}")
+
+ if not isinstance(th_action['name'], str):
+ raise RuntimeError(
+ f"Value assigned to 'name' key on '{module!s}' is not a string.",
+ )
+
+
+def _roles_children(basedir: str, k, v, parent_type: FileType, main='main') -> list:
+ results = []
+ for role in v:
+ if isinstance(role, dict):
+ if 'role' in role or 'name' in role:
+ if 'tags' not in role or 'skip_ansible_lint' not in role['tags']:
+ results.extend(_look_for_role_files(basedir,
+ role.get('role', role.get('name')),
+ main=main))
+ elif k != 'dependencies':
+ raise SystemExit('role dict {0} does not contain a "role" '
+ 'or "name" key'.format(role))
+ else:
+ results.extend(_look_for_role_files(basedir, role, main=main))
+ return results
+
+
+def _rolepath(basedir: str, role: str) -> Optional[str]:
+ role_path = None
+
+ possible_paths = [
+ # if included from a playbook
+ path_dwim(basedir, os.path.join('roles', role)),
+ path_dwim(basedir, role),
+ # if included from roles/[role]/meta/main.yml
+ path_dwim(
+ basedir, os.path.join('..', '..', '..', 'roles', role)
+ ),
+ path_dwim(basedir, os.path.join('..', '..', role)),
+ ]
+
+ if constants.DEFAULT_ROLES_PATH:
+ search_locations = constants.DEFAULT_ROLES_PATH
+ if isinstance(search_locations, str):
+ search_locations = search_locations.split(os.pathsep)
+ for loc in search_locations:
+ loc = os.path.expanduser(loc)
+ possible_paths.append(path_dwim(loc, role))
+
+ possible_paths.append(path_dwim(basedir, ''))
+
+ for path_option in possible_paths:
+ if os.path.isdir(path_option):
+ role_path = path_option
+ break
+
+ if role_path:
+ add_all_plugin_dirs(role_path)
+
+ return role_path
+
+
+def _look_for_role_files(basedir: str, role: str, main='main') -> list:
+ role_path = _rolepath(basedir, role)
+ if not role_path:
+ return []
+
+ results = []
+
+ for th in ['tasks', 'handlers', 'meta']:
+ current_path = os.path.join(role_path, th)
+ for dir, subdirs, files in os.walk(current_path):
+ for file in files:
+ file_ignorecase = file.lower()
+ if file_ignorecase.endswith(('.yml', '.yaml')):
+ thpath = os.path.join(dir, file)
+ results.append({'path': thpath, 'type': th})
+
+ return results
+
+
+def rolename(filepath):
+ idx = filepath.find('roles/')
+ if idx < 0:
+ return ''
+ role = filepath[idx + 6:]
+ role = role[:role.find('/')]
+ return role
+
+
+def _kv_to_dict(v):
+ (command, args, kwargs) = tokenize(v)
+ return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs)
+
+
+def _sanitize_task(task: dict) -> dict:
+ """Return a stripped-off task structure compatible with new Ansible.
+
+ This helper takes a copy of the incoming task and drops
+ any internally used keys from it.
+ """
+ result = task.copy()
+ # task is an AnsibleMapping which inherits from OrderedDict, so we need
+ # to use `del` to remove unwanted keys.
+ for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]:
+ if k in result:
+ del result[k]
+ return result
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def normalize_task_v2(task: dict) -> dict: # noqa: C901
+ """Ensure tasks have an action key and strings are converted to python objects."""
+ result = dict()
+ if 'always_run' in task:
+ # FIXME(ssbarnea): Delayed import to avoid circular import
+ # See https://github.com/ansible/ansible-lint/issues/880
+ # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
+ from ansiblelint.rules.AlwaysRunRule import AlwaysRunRule
+
+ raise MatchError(
+ rule=AlwaysRunRule,
+ filename=task[FILENAME_KEY],
+ linenumber=task[LINE_NUMBER_KEY])
+
+ sanitized_task = _sanitize_task(task)
+ mod_arg_parser = ModuleArgsParser(sanitized_task)
+ try:
+ action, arguments, result['delegate_to'] = mod_arg_parser.parse()
+ except AnsibleParserError as e:
+ try:
+ task_info = "%s:%s" % (task[FILENAME_KEY], task[LINE_NUMBER_KEY])
+ except KeyError:
+ task_info = "Unknown"
+ pp = pprint.PrettyPrinter(indent=2)
+ task_pprint = pp.pformat(sanitized_task)
+
+ _logger.critical("Couldn't parse task at %s (%s)\n%s", task_info, e.message, task_pprint)
+ raise SystemExit(ANSIBLE_FAILURE_RC)
+
+ # denormalize shell -> command conversion
+ if '_uses_shell' in arguments:
+ action = 'shell'
+ del arguments['_uses_shell']
+
+ for (k, v) in list(task.items()):
+ if k in ('action', 'local_action', 'args', 'delegate_to') or k == action:
+ # we don't want to re-assign these values, which were
+ # determined by the ModuleArgsParser() above
+ continue
+ else:
+ result[k] = v
+
+ result['action'] = dict(__ansible_module__=action)
+
+ if '_raw_params' in arguments:
+ result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ')
+ del arguments['_raw_params']
+ else:
+ result['action']['__ansible_arguments__'] = list()
+
+ if 'argv' in arguments and not result['action']['__ansible_arguments__']:
+ result['action']['__ansible_arguments__'] = arguments['argv']
+ del arguments['argv']
+
+ result['action'].update(arguments)
+ return result
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def normalize_task_v1(task): # noqa: C901
+ result = dict()
+ for (k, v) in task.items():
+ if k in VALID_KEYS or k.startswith('with_'):
+ if k == 'local_action' or k == 'action':
+ if not isinstance(v, dict):
+ v = _kv_to_dict(v)
+ v['__ansible_arguments__'] = v.get('__ansible_arguments__', list())
+ result['action'] = v
+ else:
+ result[k] = v
+ else:
+ if isinstance(v, str):
+ v = _kv_to_dict(k + ' ' + v)
+ elif not v:
+ v = dict(__ansible_module__=k)
+ else:
+ if isinstance(v, dict):
+ v.update(dict(__ansible_module__=k))
+ else:
+ if k == '__line__':
+ # Keep the line number stored
+ result[k] = v
+ continue
+
+ else:
+ # Tasks that include playbooks (rather than task files)
+ # can get here
+ # https://github.com/ansible/ansible-lint/issues/138
+ raise RuntimeError("Was not expecting value %s of type %s for key %s\n"
+ "Task: %s. Check the syntax of your playbook using "
+ "ansible-playbook --syntax-check" %
+ (str(v), type(v), k, str(task)))
+ v['__ansible_arguments__'] = v.get('__ansible_arguments__', list())
+ result['action'] = v
+ if 'module' in result['action']:
+ # this happens when a task uses
+ # local_action:
+ # module: ec2
+ # etc...
+ result['action']['__ansible_module__'] = result['action']['module']
+ del result['action']['module']
+ if 'args' in result:
+ result['action'].update(result.get('args'))
+ del result['args']
+ return result
+
+
+def normalize_task(task, filename):
+ ansible_action_type = task.get('__ansible_action_type__', 'task')
+ if '__ansible_action_type__' in task:
+ del task['__ansible_action_type__']
+ task = normalize_task_v2(task)
+ task[FILENAME_KEY] = filename
+ task['__ansible_action_type__'] = ansible_action_type
+ return task
+
+
+def task_to_str(task):
+ name = task.get("name")
+ if name:
+ return name
+ action = task.get("action")
+ args = " ".join([u"{0}={1}".format(k, v) for (k, v) in action.items()
+ if k not in ["__ansible_module__", "__ansible_arguments__"]] +
+ action.get("__ansible_arguments__"))
+ return u"{0} {1}".format(action["__ansible_module__"], args)
+
+
+def extract_from_list(blocks, candidates):
+ results = list()
+ for block in blocks:
+ for candidate in candidates:
+ if isinstance(block, dict) and candidate in block:
+ if isinstance(block[candidate], list):
+ results.extend(add_action_type(block[candidate], candidate))
+ elif block[candidate] is not None:
+ raise RuntimeError(
+ "Key '%s' defined, but bad value: '%s'" %
+ (candidate, str(block[candidate])))
+ return results
+
+
+def add_action_type(actions, action_type):
+ results = list()
+ for action in actions:
+ # ignore empty task
+ if not action:
+ continue
+ action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type]
+ results.append(action)
+ return results
+
+
+def get_action_tasks(yaml, file):
+ tasks = list()
+ if file['type'] in ['tasks', 'handlers']:
+ tasks = add_action_type(yaml, file['type'])
+ else:
+ tasks.extend(extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks']))
+
+ # Add sub-elements of block/rescue/always to tasks list
+ tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always']))
+ # Remove block/rescue/always elements from tasks list
+ block_rescue_always = ('block', 'rescue', 'always')
+ tasks[:] = [task for task in tasks if all(k not in task for k in block_rescue_always)]
+
+ return [task for task in tasks if
+ set(['include', 'include_tasks',
+ 'import_playbook', 'import_tasks']).isdisjoint(task.keys())]
+
+
+def get_normalized_tasks(yaml, file):
+ tasks = get_action_tasks(yaml, file)
+ res = []
+ for task in tasks:
+ # An empty `tags` block causes `None` to be returned if
+ # the `or []` is not present - `task.get('tags', [])`
+ # does not suffice.
+ if 'skip_ansible_lint' in (task.get('tags') or []):
+ # No need to normalize_task is we are skipping it.
+ continue
+ res.append(normalize_task(task, file['path']))
+
+ return res
+
+
+@lru_cache(maxsize=128)
+def parse_yaml_linenumbers(data, filename):
+ """Parse yaml as ansible.utils.parse_yaml but with linenumbers.
+
+ The line numbers are stored in each node's LINE_NUMBER_KEY key.
+ """
+ def compose_node(parent, index):
+ # the line number where the previous token has ended (plus empty lines)
+ line = loader.line
+ node = Composer.compose_node(loader, parent, index)
+ node.__line__ = line + 1
+ return node
+
+ def construct_mapping(node, deep=False):
+ mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep)
+ if hasattr(node, '__line__'):
+ mapping[LINE_NUMBER_KEY] = node.__line__
+ else:
+ mapping[LINE_NUMBER_KEY] = mapping._line_number
+ mapping[FILENAME_KEY] = filename
+ return mapping
+
+ try:
+ kwargs = {}
+ if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args:
+ kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD
+ loader = AnsibleLoader(data, **kwargs)
+ loader.compose_node = compose_node
+ loader.construct_mapping = construct_mapping
+ data = loader.get_single_data()
+ except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
+ raise SystemExit("Failed to parse YAML in %s: %s" % (filename, str(e)))
+ return data
+
+
+def get_first_cmd_arg(task):
+ try:
+ if 'cmd' in task['action']:
+ first_cmd_arg = task['action']['cmd'].split()[0]
+ else:
+ first_cmd_arg = task['action']['__ansible_arguments__'][0]
+ except IndexError:
+ return None
+ return first_cmd_arg
+
+
+def is_playbook(filename: str) -> bool:
+ """
+ Check if the file is a playbook.
+
+ Given a filename, it should return true if it looks like a playbook. The
+ function is not supposed to raise exceptions.
+ """
+ # we assume is a playbook if we loaded a sequence of dictionaries where
+ # at least one of these keys is present:
+ playbooks_keys = {
+ "gather_facts",
+ "hosts",
+ "import_playbook",
+ "post_tasks",
+ "pre_tasks",
+ "roles"
+ "tasks",
+ }
+
+ # makes it work with Path objects by converting them to strings
+ if not isinstance(filename, str):
+ filename = str(filename)
+
+ try:
+ f = parse_yaml_from_file(filename)
+ except Exception as e:
+ _logger.warning(
+ "Failed to load %s with %s, assuming is not a playbook.",
+ filename, e)
+ else:
+ if (
+ isinstance(f, AnsibleSequence) and
+ hasattr(f, 'keys') and
+ playbooks_keys.intersection(next(iter(f), {}).keys())
+ ):
+ return True
+ return False
+
+
+def get_yaml_files(options: Namespace) -> dict:
+ """Find all yaml files."""
+ # git is preferred as it also considers .gitignore
+ git_command = ['git', 'ls-files', '*.yaml', '*.yml']
+ _logger.info("Discovering files to lint: %s", ' '.join(git_command))
+
+ out = None
+
+ try:
+ out = subprocess.check_output(
+ git_command,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True
+ ).split()
+ except subprocess.CalledProcessError as exc:
+ _logger.warning(
+ "Failed to discover yaml files to lint using git: %s",
+ exc.output.rstrip('\n')
+ )
+ except FileNotFoundError as exc:
+ if options.verbosity:
+ _logger.warning(
+ "Failed to locate command: %s", exc
+ )
+
+ if out is None:
+ out = [
+ os.path.join(root, name)
+ for root, dirs, files in os.walk('.')
+ for name in files
+ if name.endswith('.yaml') or name.endswith('.yml')
+ ]
+
+ return OrderedDict.fromkeys(sorted(out))
+
+
+# FIXME: drop noqa once this function is made simpler
+# Ref: https://github.com/ansible/ansible-lint/issues/744
+def get_playbooks_and_roles(options=None) -> List[str]: # noqa: C901
+ """Find roles and playbooks."""
+ if options is None:
+ options = {}
+
+ files = get_yaml_files(options)
+
+ playbooks = []
+ role_dirs = []
+ role_internals = {
+ 'defaults',
+ 'files',
+ 'handlers',
+ 'meta',
+ 'tasks',
+ 'templates',
+ 'vars',
+ }
+
+ # detect role in repository root:
+ if 'tasks/main.yml' in files or 'tasks/main.yaml' in files:
+ role_dirs.append('.')
+
+ for p in map(Path, files):
+
+ try:
+ for file_path in options.exclude_paths:
+ if str(p.resolve()).startswith(str(file_path)):
+ raise FileNotFoundError(
+ f'File {file_path} matched exclusion entry: {p}')
+ except FileNotFoundError as e:
+ _logger.debug('Ignored %s due to: %s', p, e)
+ continue
+
+ if (next((i for i in p.parts if i.endswith('playbooks')), None) or
+ 'playbook' in p.parts[-1]):
+ playbooks.append(normpath(p))
+ continue
+
+ # ignore if any folder ends with _vars
+ if next((i for i in p.parts if i.endswith('_vars')), None):
+ continue
+ elif 'roles' in p.parts or '.' in role_dirs:
+ if 'tasks' in p.parts and p.parts[-1] in ['main.yaml', 'main.yml']:
+ role_dirs.append(str(p.parents[1]))
+ continue
+ elif role_internals.intersection(p.parts):
+ continue
+ elif 'tests' in p.parts:
+ playbooks.append(normpath(p))
+ if 'molecule' in p.parts:
+ if p.parts[-1] != 'molecule.yml':
+ playbooks.append(normpath(p))
+ continue
+ # hidden files are clearly not playbooks, likely config files.
+ if p.parts[-1].startswith('.'):
+ continue
+
+ if is_playbook(str(p)):
+ playbooks.append(normpath(p))
+ continue
+
+ _logger.info('Unknown file type: %s', normpath(p))
+
+ _logger.info('Found roles: %s', ' '.join(role_dirs))
+ _logger.info('Found playbooks: %s', ' '.join(playbooks))
+
+ return role_dirs + playbooks
+
+
+def expand_path_vars(path: str) -> str:
+ """Expand the environment or ~ variables in a path string."""
+ # It may be possible for function to be called with a Path object
+ path = str(path).strip()
+ path = os.path.expanduser(path)
+ path = os.path.expandvars(path)
+ return path
+
+
+def expand_paths_vars(paths: List[str]) -> List[str]:
+ """Expand the environment or ~ variables in a list."""
+ paths = [expand_path_vars(p) for p in paths]
+ return paths
+
+
+def get_rules_dirs(rulesdir: List[str], use_default: bool) -> List[str]:
+ """Return a list of rules dirs."""
+ default_ruledirs = [DEFAULT_RULESDIR]
+ default_custom_rulesdir = os.environ.get(
+ CUSTOM_RULESDIR_ENVVAR, os.path.join(DEFAULT_RULESDIR, "custom")
+ )
+ custom_ruledirs = sorted(
+ str(rdir.resolve())
+ for rdir in Path(default_custom_rulesdir).iterdir()
+ if rdir.is_dir() and (rdir / "__init__.py").exists()
+ )
+ if use_default:
+ return rulesdir + custom_ruledirs + default_ruledirs
+
+ return rulesdir or custom_ruledirs + default_ruledirs