diff options
Diffstat (limited to 'lib/ansiblelint/utils.py')
-rw-r--r-- | lib/ansiblelint/utils.py | 836 |
1 files changed, 836 insertions, 0 deletions
diff --git a/lib/ansiblelint/utils.py b/lib/ansiblelint/utils.py new file mode 100644 index 0000000..feac4d7 --- /dev/null +++ b/lib/ansiblelint/utils.py @@ -0,0 +1,836 @@ +# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Generic utility helpers.""" + +import contextlib +import inspect +import logging +import os +import pprint +import subprocess +from argparse import Namespace +from collections import OrderedDict +from functools import lru_cache +from pathlib import Path +from typing import Callable, ItemsView, List, Optional, Tuple + +import yaml +from ansible import constants +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.mod_args import ModuleArgsParser +from ansible.parsing.splitter import split_args +from ansible.parsing.yaml.constructor import AnsibleConstructor +from ansible.parsing.yaml.loader import AnsibleLoader +from ansible.parsing.yaml.objects import AnsibleSequence +from ansible.plugins.loader import add_all_plugin_dirs +from ansible.template import Templar +from yaml.composer import Composer +from yaml.representer import RepresenterError + +from ansiblelint.constants import ( + ANSIBLE_FAILURE_RC, CUSTOM_RULESDIR_ENVVAR, DEFAULT_RULESDIR, FileType, +) +from ansiblelint.errors import MatchError +from ansiblelint.file_utils import normpath + +# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a +# string as the password to enable such yaml files to be opened and parsed +# successfully. +DEFAULT_VAULT_PASSWORD = 'x' + +PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None) + + +_logger = logging.getLogger(__name__) + + +def parse_yaml_from_file(filepath: str) -> dict: + dl = DataLoader() + if hasattr(dl, 'set_vault_password'): + dl.set_vault_password(DEFAULT_VAULT_PASSWORD) + return dl.load_from_file(filepath) + + +def path_dwim(basedir: str, given: str) -> str: + dl = DataLoader() + dl.set_basedir(basedir) + return dl.path_dwim(given) + + +def ansible_template(basedir, varname, templatevars, **kwargs): + dl = DataLoader() + dl.set_basedir(basedir) + templar = Templar(dl, variables=templatevars) + return templar.template(varname, **kwargs) + + +LINE_NUMBER_KEY = '__line__' +FILENAME_KEY = '__file__' + +VALID_KEYS = [ + 'name', 'action', 'when', 'async', 'poll', 'notify', + 'first_available_file', 'include', 'include_tasks', 'import_tasks', 'import_playbook', + 'tags', 'register', 'ignore_errors', 'delegate_to', + 'local_action', 'transport', 'remote_user', 'sudo', + 'sudo_user', 'sudo_pass', 'when', 'connection', 'environment', 'args', 'always_run', + 'any_errors_fatal', 'changed_when', 'failed_when', 'check_mode', 'delay', + 'retries', 'until', 'su', 'su_user', 'su_pass', 'no_log', 'run_once', + 'become', 'become_user', 'become_method', FILENAME_KEY, +] + +BLOCK_NAME_TO_ACTION_TYPE_MAP = { + 'tasks': 'task', + 'handlers': 'handler', + 'pre_tasks': 'task', + 'post_tasks': 'task', + 'block': 'meta', + 'rescue': 'meta', + 'always': 'meta', +} + + +def tokenize(line): + tokens = line.lstrip().split(" ") + if tokens[0] == '-': + tokens = tokens[1:] + if tokens[0] == 'action:' or tokens[0] == 'local_action:': + tokens = tokens[1:] + command = tokens[0].replace(":", "") + + args = list() + kwargs = dict() + nonkvfound = False + for arg in tokens[1:]: + if "=" in arg and not nonkvfound: + kv = arg.split("=", 1) + kwargs[kv[0]] = kv[1] + else: + nonkvfound = True + args.append(arg) + return (command, args, kwargs) + + +def _playbook_items(pb_data: dict) -> ItemsView: + if isinstance(pb_data, dict): + return pb_data.items() + elif not pb_data: + return [] + else: + return [item for play in pb_data for item in play.items()] + + +def _rebind_match_filename(filename: str, func) -> Callable: + def func_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except MatchError as e: + e.filename = filename + raise e + return func_wrapper + + +def _set_collections_basedir(basedir: str): + # Sets the playbook directory as playbook_paths for the collection loader + try: + # Ansible 2.10+ + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansible.utils.collection_loader import AnsibleCollectionConfig + + AnsibleCollectionConfig.playbook_paths = basedir + except ImportError: + # Ansible 2.8 or 2.9 + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansible.utils.collection_loader import set_collection_playbook_paths + + set_collection_playbook_paths(basedir) + + +def find_children(playbook: Tuple[str, str], playbook_dir: str) -> List: + if not os.path.exists(playbook[0]): + return [] + _set_collections_basedir(playbook_dir or '.') + add_all_plugin_dirs(playbook_dir or '.') + if playbook[1] == 'role': + playbook_ds = {'roles': [{'role': playbook[0]}]} + else: + try: + playbook_ds = parse_yaml_from_file(playbook[0]) + except AnsibleError as e: + raise SystemExit(str(e)) + results = [] + basedir = os.path.dirname(playbook[0]) + items = _playbook_items(playbook_ds) + for item in items: + for child in _rebind_match_filename(playbook[0], play_children)( + basedir, item, playbook[1], playbook_dir): + if "$" in child['path'] or "{{" in child['path']: + continue + valid_tokens = list() + for token in split_args(child['path']): + if '=' in token: + break + valid_tokens.append(token) + path = ' '.join(valid_tokens) + results.append({ + 'path': path_dwim(basedir, path), + 'type': child['type'] + }) + return results + + +def template(basedir, value, vars, fail_on_undefined=False, **kwargs): + try: + value = ansible_template(os.path.abspath(basedir), value, vars, + **dict(kwargs, fail_on_undefined=fail_on_undefined)) + # Hack to skip the following exception when using to_json filter on a variable. + # I guess the filter doesn't like empty vars... + except (AnsibleError, ValueError, RepresenterError): + # templating failed, so just keep value as is. + pass + return value + + +def play_children(basedir, item, parent_type, playbook_dir): + delegate_map = { + 'tasks': _taskshandlers_children, + 'pre_tasks': _taskshandlers_children, + 'post_tasks': _taskshandlers_children, + 'block': _taskshandlers_children, + 'include': _include_children, + 'import_playbook': _include_children, + 'roles': _roles_children, + 'dependencies': _roles_children, + 'handlers': _taskshandlers_children, + 'include_tasks': _include_children, + 'import_tasks': _include_children, + } + (k, v) = item + add_all_plugin_dirs(os.path.abspath(basedir)) + + if k in delegate_map: + if v: + v = template(os.path.abspath(basedir), + v, + dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)), + fail_on_undefined=False) + return delegate_map[k](basedir, k, v, parent_type) + return [] + + +def _include_children(basedir, k, v, parent_type): + # handle special case include_tasks: name=filename.yml + if k == 'include_tasks' and isinstance(v, dict) and 'file' in v: + v = v['file'] + + # handle include: filename.yml tags=blah + (command, args, kwargs) = tokenize("{0}: {1}".format(k, v)) + + result = path_dwim(basedir, args[0]) + if not os.path.exists(result): + result = path_dwim(os.path.join(os.path.dirname(basedir)), v) + return [{'path': result, 'type': parent_type}] + + +def _taskshandlers_children(basedir, k, v, parent_type: FileType) -> List: + results = [] + for th in v: + + # ignore empty tasks, `-` + if not th: + continue + + with contextlib.suppress(LookupError): + children = _get_task_handler_children_for_tasks_or_playbooks( + th, basedir, k, parent_type, + ) + results.append(children) + continue + + if 'include_role' in th or 'import_role' in th: # lgtm [py/unreachable-statement] + th = normalize_task_v2(th) + _validate_task_handler_action_for_role(th['action']) + results.extend(_roles_children(basedir, k, [th['action'].get("name")], + parent_type, + main=th['action'].get('tasks_from', 'main'))) + continue + + if 'block' not in th: + continue + + results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type)) + if 'rescue' in th: + results.extend(_taskshandlers_children(basedir, k, th['rescue'], parent_type)) + if 'always' in th: + results.extend(_taskshandlers_children(basedir, k, th['always'], parent_type)) + + return results + + +def _get_task_handler_children_for_tasks_or_playbooks( + task_handler, basedir: str, k, parent_type: FileType, +) -> dict: + """Try to get children of taskhandler for include/import tasks/playbooks.""" + child_type = k if parent_type == 'playbook' else parent_type + + task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks' + for task_handler_key in task_include_keys: + + with contextlib.suppress(KeyError): + + # ignore empty tasks + if not task_handler: + continue + + return { + 'path': path_dwim(basedir, task_handler[task_handler_key]), + 'type': child_type, + } + + raise LookupError( + f'The node contains none of: {", ".join(task_include_keys)}', + ) + + +def _validate_task_handler_action_for_role(th_action: dict) -> None: + """Verify that the task handler action is valid for role include.""" + module = th_action['__ansible_module__'] + + if 'name' not in th_action: + raise MatchError(f"Failed to find required 'name' key in {module!s}") + + if not isinstance(th_action['name'], str): + raise RuntimeError( + f"Value assigned to 'name' key on '{module!s}' is not a string.", + ) + + +def _roles_children(basedir: str, k, v, parent_type: FileType, main='main') -> list: + results = [] + for role in v: + if isinstance(role, dict): + if 'role' in role or 'name' in role: + if 'tags' not in role or 'skip_ansible_lint' not in role['tags']: + results.extend(_look_for_role_files(basedir, + role.get('role', role.get('name')), + main=main)) + elif k != 'dependencies': + raise SystemExit('role dict {0} does not contain a "role" ' + 'or "name" key'.format(role)) + else: + results.extend(_look_for_role_files(basedir, role, main=main)) + return results + + +def _rolepath(basedir: str, role: str) -> Optional[str]: + role_path = None + + possible_paths = [ + # if included from a playbook + path_dwim(basedir, os.path.join('roles', role)), + path_dwim(basedir, role), + # if included from roles/[role]/meta/main.yml + path_dwim( + basedir, os.path.join('..', '..', '..', 'roles', role) + ), + path_dwim(basedir, os.path.join('..', '..', role)), + ] + + if constants.DEFAULT_ROLES_PATH: + search_locations = constants.DEFAULT_ROLES_PATH + if isinstance(search_locations, str): + search_locations = search_locations.split(os.pathsep) + for loc in search_locations: + loc = os.path.expanduser(loc) + possible_paths.append(path_dwim(loc, role)) + + possible_paths.append(path_dwim(basedir, '')) + + for path_option in possible_paths: + if os.path.isdir(path_option): + role_path = path_option + break + + if role_path: + add_all_plugin_dirs(role_path) + + return role_path + + +def _look_for_role_files(basedir: str, role: str, main='main') -> list: + role_path = _rolepath(basedir, role) + if not role_path: + return [] + + results = [] + + for th in ['tasks', 'handlers', 'meta']: + current_path = os.path.join(role_path, th) + for dir, subdirs, files in os.walk(current_path): + for file in files: + file_ignorecase = file.lower() + if file_ignorecase.endswith(('.yml', '.yaml')): + thpath = os.path.join(dir, file) + results.append({'path': thpath, 'type': th}) + + return results + + +def rolename(filepath): + idx = filepath.find('roles/') + if idx < 0: + return '' + role = filepath[idx + 6:] + role = role[:role.find('/')] + return role + + +def _kv_to_dict(v): + (command, args, kwargs) = tokenize(v) + return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs) + + +def _sanitize_task(task: dict) -> dict: + """Return a stripped-off task structure compatible with new Ansible. + + This helper takes a copy of the incoming task and drops + any internally used keys from it. + """ + result = task.copy() + # task is an AnsibleMapping which inherits from OrderedDict, so we need + # to use `del` to remove unwanted keys. + for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]: + if k in result: + del result[k] + return result + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def normalize_task_v2(task: dict) -> dict: # noqa: C901 + """Ensure tasks have an action key and strings are converted to python objects.""" + result = dict() + if 'always_run' in task: + # FIXME(ssbarnea): Delayed import to avoid circular import + # See https://github.com/ansible/ansible-lint/issues/880 + # noqa: # pylint:disable=cyclic-import,import-outside-toplevel + from ansiblelint.rules.AlwaysRunRule import AlwaysRunRule + + raise MatchError( + rule=AlwaysRunRule, + filename=task[FILENAME_KEY], + linenumber=task[LINE_NUMBER_KEY]) + + sanitized_task = _sanitize_task(task) + mod_arg_parser = ModuleArgsParser(sanitized_task) + try: + action, arguments, result['delegate_to'] = mod_arg_parser.parse() + except AnsibleParserError as e: + try: + task_info = "%s:%s" % (task[FILENAME_KEY], task[LINE_NUMBER_KEY]) + except KeyError: + task_info = "Unknown" + pp = pprint.PrettyPrinter(indent=2) + task_pprint = pp.pformat(sanitized_task) + + _logger.critical("Couldn't parse task at %s (%s)\n%s", task_info, e.message, task_pprint) + raise SystemExit(ANSIBLE_FAILURE_RC) + + # denormalize shell -> command conversion + if '_uses_shell' in arguments: + action = 'shell' + del arguments['_uses_shell'] + + for (k, v) in list(task.items()): + if k in ('action', 'local_action', 'args', 'delegate_to') or k == action: + # we don't want to re-assign these values, which were + # determined by the ModuleArgsParser() above + continue + else: + result[k] = v + + result['action'] = dict(__ansible_module__=action) + + if '_raw_params' in arguments: + result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ') + del arguments['_raw_params'] + else: + result['action']['__ansible_arguments__'] = list() + + if 'argv' in arguments and not result['action']['__ansible_arguments__']: + result['action']['__ansible_arguments__'] = arguments['argv'] + del arguments['argv'] + + result['action'].update(arguments) + return result + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def normalize_task_v1(task): # noqa: C901 + result = dict() + for (k, v) in task.items(): + if k in VALID_KEYS or k.startswith('with_'): + if k == 'local_action' or k == 'action': + if not isinstance(v, dict): + v = _kv_to_dict(v) + v['__ansible_arguments__'] = v.get('__ansible_arguments__', list()) + result['action'] = v + else: + result[k] = v + else: + if isinstance(v, str): + v = _kv_to_dict(k + ' ' + v) + elif not v: + v = dict(__ansible_module__=k) + else: + if isinstance(v, dict): + v.update(dict(__ansible_module__=k)) + else: + if k == '__line__': + # Keep the line number stored + result[k] = v + continue + + else: + # Tasks that include playbooks (rather than task files) + # can get here + # https://github.com/ansible/ansible-lint/issues/138 + raise RuntimeError("Was not expecting value %s of type %s for key %s\n" + "Task: %s. Check the syntax of your playbook using " + "ansible-playbook --syntax-check" % + (str(v), type(v), k, str(task))) + v['__ansible_arguments__'] = v.get('__ansible_arguments__', list()) + result['action'] = v + if 'module' in result['action']: + # this happens when a task uses + # local_action: + # module: ec2 + # etc... + result['action']['__ansible_module__'] = result['action']['module'] + del result['action']['module'] + if 'args' in result: + result['action'].update(result.get('args')) + del result['args'] + return result + + +def normalize_task(task, filename): + ansible_action_type = task.get('__ansible_action_type__', 'task') + if '__ansible_action_type__' in task: + del task['__ansible_action_type__'] + task = normalize_task_v2(task) + task[FILENAME_KEY] = filename + task['__ansible_action_type__'] = ansible_action_type + return task + + +def task_to_str(task): + name = task.get("name") + if name: + return name + action = task.get("action") + args = " ".join([u"{0}={1}".format(k, v) for (k, v) in action.items() + if k not in ["__ansible_module__", "__ansible_arguments__"]] + + action.get("__ansible_arguments__")) + return u"{0} {1}".format(action["__ansible_module__"], args) + + +def extract_from_list(blocks, candidates): + results = list() + for block in blocks: + for candidate in candidates: + if isinstance(block, dict) and candidate in block: + if isinstance(block[candidate], list): + results.extend(add_action_type(block[candidate], candidate)) + elif block[candidate] is not None: + raise RuntimeError( + "Key '%s' defined, but bad value: '%s'" % + (candidate, str(block[candidate]))) + return results + + +def add_action_type(actions, action_type): + results = list() + for action in actions: + # ignore empty task + if not action: + continue + action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type] + results.append(action) + return results + + +def get_action_tasks(yaml, file): + tasks = list() + if file['type'] in ['tasks', 'handlers']: + tasks = add_action_type(yaml, file['type']) + else: + tasks.extend(extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks'])) + + # Add sub-elements of block/rescue/always to tasks list + tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always'])) + # Remove block/rescue/always elements from tasks list + block_rescue_always = ('block', 'rescue', 'always') + tasks[:] = [task for task in tasks if all(k not in task for k in block_rescue_always)] + + return [task for task in tasks if + set(['include', 'include_tasks', + 'import_playbook', 'import_tasks']).isdisjoint(task.keys())] + + +def get_normalized_tasks(yaml, file): + tasks = get_action_tasks(yaml, file) + res = [] + for task in tasks: + # An empty `tags` block causes `None` to be returned if + # the `or []` is not present - `task.get('tags', [])` + # does not suffice. + if 'skip_ansible_lint' in (task.get('tags') or []): + # No need to normalize_task is we are skipping it. + continue + res.append(normalize_task(task, file['path'])) + + return res + + +@lru_cache(maxsize=128) +def parse_yaml_linenumbers(data, filename): + """Parse yaml as ansible.utils.parse_yaml but with linenumbers. + + The line numbers are stored in each node's LINE_NUMBER_KEY key. + """ + def compose_node(parent, index): + # the line number where the previous token has ended (plus empty lines) + line = loader.line + node = Composer.compose_node(loader, parent, index) + node.__line__ = line + 1 + return node + + def construct_mapping(node, deep=False): + mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep) + if hasattr(node, '__line__'): + mapping[LINE_NUMBER_KEY] = node.__line__ + else: + mapping[LINE_NUMBER_KEY] = mapping._line_number + mapping[FILENAME_KEY] = filename + return mapping + + try: + kwargs = {} + if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args: + kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD + loader = AnsibleLoader(data, **kwargs) + loader.compose_node = compose_node + loader.construct_mapping = construct_mapping + data = loader.get_single_data() + except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e: + raise SystemExit("Failed to parse YAML in %s: %s" % (filename, str(e))) + return data + + +def get_first_cmd_arg(task): + try: + if 'cmd' in task['action']: + first_cmd_arg = task['action']['cmd'].split()[0] + else: + first_cmd_arg = task['action']['__ansible_arguments__'][0] + except IndexError: + return None + return first_cmd_arg + + +def is_playbook(filename: str) -> bool: + """ + Check if the file is a playbook. + + Given a filename, it should return true if it looks like a playbook. The + function is not supposed to raise exceptions. + """ + # we assume is a playbook if we loaded a sequence of dictionaries where + # at least one of these keys is present: + playbooks_keys = { + "gather_facts", + "hosts", + "import_playbook", + "post_tasks", + "pre_tasks", + "roles" + "tasks", + } + + # makes it work with Path objects by converting them to strings + if not isinstance(filename, str): + filename = str(filename) + + try: + f = parse_yaml_from_file(filename) + except Exception as e: + _logger.warning( + "Failed to load %s with %s, assuming is not a playbook.", + filename, e) + else: + if ( + isinstance(f, AnsibleSequence) and + hasattr(f, 'keys') and + playbooks_keys.intersection(next(iter(f), {}).keys()) + ): + return True + return False + + +def get_yaml_files(options: Namespace) -> dict: + """Find all yaml files.""" + # git is preferred as it also considers .gitignore + git_command = ['git', 'ls-files', '*.yaml', '*.yml'] + _logger.info("Discovering files to lint: %s", ' '.join(git_command)) + + out = None + + try: + out = subprocess.check_output( + git_command, + stderr=subprocess.STDOUT, + universal_newlines=True + ).split() + except subprocess.CalledProcessError as exc: + _logger.warning( + "Failed to discover yaml files to lint using git: %s", + exc.output.rstrip('\n') + ) + except FileNotFoundError as exc: + if options.verbosity: + _logger.warning( + "Failed to locate command: %s", exc + ) + + if out is None: + out = [ + os.path.join(root, name) + for root, dirs, files in os.walk('.') + for name in files + if name.endswith('.yaml') or name.endswith('.yml') + ] + + return OrderedDict.fromkeys(sorted(out)) + + +# FIXME: drop noqa once this function is made simpler +# Ref: https://github.com/ansible/ansible-lint/issues/744 +def get_playbooks_and_roles(options=None) -> List[str]: # noqa: C901 + """Find roles and playbooks.""" + if options is None: + options = {} + + files = get_yaml_files(options) + + playbooks = [] + role_dirs = [] + role_internals = { + 'defaults', + 'files', + 'handlers', + 'meta', + 'tasks', + 'templates', + 'vars', + } + + # detect role in repository root: + if 'tasks/main.yml' in files or 'tasks/main.yaml' in files: + role_dirs.append('.') + + for p in map(Path, files): + + try: + for file_path in options.exclude_paths: + if str(p.resolve()).startswith(str(file_path)): + raise FileNotFoundError( + f'File {file_path} matched exclusion entry: {p}') + except FileNotFoundError as e: + _logger.debug('Ignored %s due to: %s', p, e) + continue + + if (next((i for i in p.parts if i.endswith('playbooks')), None) or + 'playbook' in p.parts[-1]): + playbooks.append(normpath(p)) + continue + + # ignore if any folder ends with _vars + if next((i for i in p.parts if i.endswith('_vars')), None): + continue + elif 'roles' in p.parts or '.' in role_dirs: + if 'tasks' in p.parts and p.parts[-1] in ['main.yaml', 'main.yml']: + role_dirs.append(str(p.parents[1])) + continue + elif role_internals.intersection(p.parts): + continue + elif 'tests' in p.parts: + playbooks.append(normpath(p)) + if 'molecule' in p.parts: + if p.parts[-1] != 'molecule.yml': + playbooks.append(normpath(p)) + continue + # hidden files are clearly not playbooks, likely config files. + if p.parts[-1].startswith('.'): + continue + + if is_playbook(str(p)): + playbooks.append(normpath(p)) + continue + + _logger.info('Unknown file type: %s', normpath(p)) + + _logger.info('Found roles: %s', ' '.join(role_dirs)) + _logger.info('Found playbooks: %s', ' '.join(playbooks)) + + return role_dirs + playbooks + + +def expand_path_vars(path: str) -> str: + """Expand the environment or ~ variables in a path string.""" + # It may be possible for function to be called with a Path object + path = str(path).strip() + path = os.path.expanduser(path) + path = os.path.expandvars(path) + return path + + +def expand_paths_vars(paths: List[str]) -> List[str]: + """Expand the environment or ~ variables in a list.""" + paths = [expand_path_vars(p) for p in paths] + return paths + + +def get_rules_dirs(rulesdir: List[str], use_default: bool) -> List[str]: + """Return a list of rules dirs.""" + default_ruledirs = [DEFAULT_RULESDIR] + default_custom_rulesdir = os.environ.get( + CUSTOM_RULESDIR_ENVVAR, os.path.join(DEFAULT_RULESDIR, "custom") + ) + custom_ruledirs = sorted( + str(rdir.resolve()) + for rdir in Path(default_custom_rulesdir).iterdir() + if rdir.is_dir() and (rdir / "__init__.py").exists() + ) + if use_default: + return rulesdir + custom_ruledirs + default_ruledirs + + return rulesdir or custom_ruledirs + default_ruledirs |