diff options
Diffstat (limited to 'src/ansiblelint/file_utils.py')
-rw-r--r-- | src/ansiblelint/file_utils.py | 523 |
1 files changed, 523 insertions, 0 deletions
diff --git a/src/ansiblelint/file_utils.py b/src/ansiblelint/file_utils.py new file mode 100644 index 0000000..500dff7 --- /dev/null +++ b/src/ansiblelint/file_utils.py @@ -0,0 +1,523 @@ +"""Utility functions related to file operations.""" +from __future__ import annotations + +import copy +import logging +import os +import pathlib +import subprocess +import sys +from argparse import Namespace +from collections import OrderedDict, defaultdict +from contextlib import contextmanager +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import TYPE_CHECKING, Any, Iterator, cast + +import wcmatch.pathlib +from wcmatch.wcmatch import RECURSIVE, WcMatch +from yaml.error import YAMLError + +from ansiblelint.config import BASE_KINDS, options +from ansiblelint.constants import GIT_CMD, FileType, States + +if TYPE_CHECKING: + # https://github.com/PyCQA/pylint/issues/3979 + BasePathLike = os.PathLike[Any] # pylint: disable=unsubscriptable-object +else: + BasePathLike = os.PathLike + +_logger = logging.getLogger(__package__) + + +def abspath(path: str, base_dir: str) -> str: + """Make relative path absolute relative to given directory. + + Args: + path (str): the path to make absolute + base_dir (str): the directory from which make \ + relative paths absolute + """ + if not os.path.isabs(path): + # Don't use abspath as it assumes path is relative to cwd. + # We want it relative to base_dir. + path = os.path.join(base_dir, path) + + return os.path.normpath(path) + + +def normpath(path: str | BasePathLike) -> str: + """ + Normalize a path in order to provide a more consistent output. + + Currently it generates a relative path but in the future we may want to + make this user configurable. + """ + # prevent possible ValueError with relpath(), when input is an empty string + if not path: + path = "." + # conversion to string in order to allow receiving non string objects + relpath = os.path.relpath(str(path)) + path_absolute = os.path.abspath(str(path)) + if path_absolute.startswith(os.getcwd()): + return relpath + if path_absolute.startswith(os.path.expanduser("~")): + return path_absolute.replace(os.path.expanduser("~"), "~") + # we avoid returning relative paths that end-up at root level + if path_absolute in relpath: + return path_absolute + if relpath.startswith("../"): + return path_absolute + return relpath + + +# That is needed for compatibility with py38, later was added to Path class +def is_relative_to(path: Path, *other: Any) -> bool: + """Return True if the path is relative to another path or False.""" + try: + path.resolve().absolute().relative_to(*other) + return True + except ValueError: + return False + + +def normpath_path(path: str | BasePathLike) -> Path: + """Normalize a path in order to provide a more consistent output. + + - Any symlinks are resolved. + - Any paths outside the CWD are resolved to their absolute path. + - Any absolute path within current user home directory is compressed to + make use of '~', so it is easier to read and more portable. + """ + if not isinstance(path, Path): + path = Path(path) + + is_relative = is_relative_to(path, path.cwd()) + path = path.resolve() + if is_relative: + path = path.relative_to(path.cwd()) + + # Compress any absolute path within current user home directory + if path.is_absolute(): + home = Path.home() + if is_relative_to(path, home): + path = Path("~") / path.relative_to(home) + + return path + + +@contextmanager +def cwd(path: str | BasePathLike) -> Iterator[None]: + """Context manager for temporary changing current working directory.""" + old_pwd = os.getcwd() + os.chdir(path) + try: + yield + finally: + os.chdir(old_pwd) + + +def expand_path_vars(path: str) -> str: + """Expand the environment or ~ variables in a path string.""" + # It may be possible for function to be called with a Path object + path = str(path).strip() + path = os.path.expanduser(path) + path = os.path.expandvars(path) + return path + + +def expand_paths_vars(paths: list[str]) -> list[str]: + """Expand the environment or ~ variables in a list.""" + paths = [expand_path_vars(p) for p in paths] + return paths + + +def kind_from_path(path: Path, base: bool = False) -> FileType: + """Determine the file kind based on its name. + + When called with base=True, it will return the base file type instead + of the explicit one. That is expected to return 'yaml' for any yaml files. + """ + # pathlib.Path.match patterns are very limited, they do not support *a*.yml + # glob.glob supports **/foo.yml but not multiple extensions + pathex = wcmatch.pathlib.PurePath(str(path.absolute().resolve())) + kinds = options.kinds if not base else BASE_KINDS + for entry in kinds: + for k, v in entry.items(): + if pathex.globmatch( + v, + flags=( + wcmatch.pathlib.GLOBSTAR + | wcmatch.pathlib.BRACE + | wcmatch.pathlib.DOTGLOB + ), + ): + return str(k) # type: ignore + + if base: + # Unknown base file type is default + return "" + + if path.is_dir(): + return "role" + + if str(path) == "/dev/stdin": + return "playbook" + + # Unknown file types report a empty string (evaluated as False) + return "" + + +# pylint: disable=too-many-instance-attributes +class Lintable: + """Defines a file/folder that can be linted. + + Providing file content when creating the object allow creation of in-memory + instances that do not need files to be present on disk. + + When symlinks are given, they will always be resolved to their target. + """ + + def __init__( + self, + name: str | Path, + content: str | None = None, + kind: FileType | None = None, + base_kind: str = "", + ): + """Create a Lintable instance.""" + self.dir: str = "" + self.kind: FileType | None = None + self.stop_processing = False # Set to stop other rules from running + self._data: Any = States.NOT_LOADED + self.line_skips: dict[int, set[str]] = defaultdict(set) + self.exc: Exception | None = None # Stores data loading exceptions + + if isinstance(name, str): + name = Path(name) + is_relative = is_relative_to(name, str(name.cwd())) + name = name.resolve() + if is_relative: + name = name.relative_to(name.cwd()) + name = normpath_path(name) + self.path = name + # Filename is effective file on disk, for stdin is a namedtempfile + self.name = self.filename = str(name) + + self._content = self._original_content = content + self.updated = False + + # if the lintable is part of a role, we save role folder name + self.role = "" + parts = self.path.parent.parts + if "roles" in parts: + role = self.path + while role.parent.name != "roles" and role.name: + role = role.parent + if role.exists(): + self.role = role.name + + if str(self.path) in ["/dev/stdin", "-"]: + # pylint: disable=consider-using-with + self.file = NamedTemporaryFile(mode="w+", suffix="playbook.yml") + self.filename = self.file.name + self._content = sys.stdin.read() + self.file.write(self._content) + self.file.flush() + self.path = Path(self.file.name) + self.name = "stdin" + self.kind = "playbook" + self.dir = "/" + else: + self.kind = kind or kind_from_path(self.path) + # We store absolute directory in dir + if not self.dir: + if self.kind == "role": + self.dir = str(self.path.resolve()) + else: + self.dir = str(self.path.parent.resolve()) + + # determine base file kind (yaml, xml, ini, ...) + self.base_kind = base_kind or kind_from_path(self.path, base=True) + self.abspath = self.path.expanduser().absolute() + + if self.kind == "yaml": + self.data # pylint: disable=pointless-statement + + def _guess_kind(self) -> None: + if self.kind == "yaml": + if isinstance(self.data, list) and "hosts" in self.data[0]: + if "rules" not in self.data[0]: + self.kind = "playbook" + else: + self.kind = "rulebook" + # we we failed to guess the more specific kind, we warn user + if self.kind == "yaml": + _logger.debug( + "Passed '%s' positional argument was identified as generic '%s' file kind.", + self.name, + self.kind, + ) + + def __getitem__(self, key: Any) -> Any: + """Provide compatibility subscriptable support.""" + if key == "path": + return str(self.path) + if key == "type": + return str(self.kind) + raise NotImplementedError() + + def get(self, key: Any, default: Any = None) -> Any: + """Provide compatibility subscriptable support.""" + try: + return self[key] + except NotImplementedError: + return default + + def _populate_content_cache_from_disk(self) -> None: + # Can raise UnicodeDecodeError + try: + self._content = self.path.expanduser().resolve().read_text(encoding="utf-8") + except FileNotFoundError as ex: + if vars(options).get("progressive"): + self._content = "" + else: + raise ex + if self._original_content is None: + self._original_content = self._content + + @property + def content(self) -> str: + """Retrieve file content, from internal cache or disk.""" + if self._content is None: + self._populate_content_cache_from_disk() + return cast(str, self._content) + + @content.setter + def content(self, value: str) -> None: + """Update ``content`` and calculate ``updated``. + + To calculate ``updated`` this will read the file from disk if the cache + has not already been populated. + """ + if not isinstance(value, str): + raise TypeError(f"Expected str but got {type(value)}") + if self._original_content is None: + if self._content is not None: + self._original_content = self._content + elif self.path.exists(): + self._populate_content_cache_from_disk() + else: + # new file + self._original_content = "" + self.updated = self._original_content != value + self._content = value + + @content.deleter + def content(self) -> None: + """Reset the internal content cache.""" + self._content = None + + def write(self, force: bool = False) -> None: + """Write the value of ``Lintable.content`` to disk. + + This only writes to disk if the content has been updated (``Lintable.updated``). + For example, you can update the content, and then write it to disk like this: + + .. code:: python + + lintable.content = new_content + lintable.write() + + Use ``force=True`` when you want to force a content rewrite even if the + content has not changed. For example: + + .. code:: python + + lintable.write(force=True) + """ + if not force and not self.updated: + # No changes to write. + return + self.path.expanduser().resolve().write_text( + self._content or "", encoding="utf-8" + ) + + def __hash__(self) -> int: + """Return a hash value of the lintables.""" + return hash((self.name, self.kind, self.abspath)) + + def __eq__(self, other: object) -> bool: + """Identify whether the other object represents the same rule match.""" + if isinstance(other, Lintable): + return bool(self.name == other.name and self.kind == other.kind) + return False + + def __repr__(self) -> str: + """Return user friendly representation of a lintable.""" + return f"{self.name} ({self.kind})" + + @property + def data(self) -> Any: + """Return loaded data representation for current file, if possible.""" + if self._data == States.NOT_LOADED: + if self.path.is_dir(): + self._data = None + return self._data + try: + if str(self.base_kind) == "text/yaml": + from ansiblelint.utils import ( # pylint: disable=import-outside-toplevel + parse_yaml_linenumbers, + ) + + self._data = parse_yaml_linenumbers(self) + # now that _data is not empty, we can try guessing if playbook or rulebook + # it has to be done before append_skipped_rules() call as it's relying + # on self.kind. + if self.kind == "yaml": + self._guess_kind() + # Lazy import to avoid delays and cyclic-imports + if "append_skipped_rules" not in globals(): + # pylint: disable=import-outside-toplevel + from ansiblelint.skip_utils import append_skipped_rules + + self._data = append_skipped_rules(self._data, self) + else: + logging.debug( + "data set to None for %s due to being of %s kind.", + self.path, + self.base_kind, + ) + self._data = States.UNKNOWN_DATA + + except (RuntimeError, FileNotFoundError, YAMLError) as exc: + self._data = States.LOAD_FAILED + self.exc = exc + return self._data + + +# pylint: disable=redefined-outer-name +def discover_lintables(options: Namespace) -> dict[str, Any]: + """Find all files that we know how to lint. + + Return format is normalized, relative for stuff below cwd, ~/ for content + under current user and absolute for everything else. + """ + # git is preferred as it also considers .gitignore + git_command_present = [ + *GIT_CMD, + "ls-files", + "--cached", + "--others", + "--exclude-standard", + "-z", + ] + git_command_absent = [*GIT_CMD, "ls-files", "--deleted", "-z"] + out = None + + try: + out_present = subprocess.check_output( + git_command_present, stderr=subprocess.STDOUT, text=True + ).split("\x00")[:-1] + _logger.info( + "Discovered files to lint using: %s", " ".join(git_command_present) + ) + + out_absent = subprocess.check_output( + git_command_absent, stderr=subprocess.STDOUT, text=True + ).split("\x00")[:-1] + _logger.info("Excluded removed files using: %s", " ".join(git_command_absent)) + + out = set(out_present) - set(out_absent) + except subprocess.CalledProcessError as exc: + if not (exc.returncode == 128 and "fatal: not a git repository" in exc.output): + _logger.warning( + "Failed to discover lintable files using git: %s", + exc.output.rstrip("\n"), + ) + except FileNotFoundError as exc: + if options.verbosity: + _logger.warning("Failed to locate command: %s", exc) + + if out is None: + exclude_pattern = "|".join(str(x) for x in options.exclude_paths) + _logger.info("Looking up for files, excluding %s ...", exclude_pattern) + # remove './' prefix from output of WcMatch + out = { + strip_dotslash_prefix(fname) + for fname in WcMatch( + ".", exclude_pattern=exclude_pattern, flags=RECURSIVE, limit=256 + ).match() + } + + return OrderedDict.fromkeys(sorted(out)) + + +def strip_dotslash_prefix(fname: str) -> str: + """Remove ./ leading from filenames.""" + return fname[2:] if fname.startswith("./") else fname + + +def guess_project_dir(config_file: str | None) -> str: + """Return detected project dir or current working directory.""" + path = None + if config_file is not None and config_file != "/dev/null": + target = pathlib.Path(config_file) + if target.exists(): + # for config inside .config, we return the parent dir as project dir + cfg_path = target.parent + if cfg_path.parts[-1] == ".config": + path = str(cfg_path.parent.absolute()) + else: + path = str(cfg_path.absolute()) + + if path is None: + try: + result = subprocess.run( + [*GIT_CMD, "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + check=True, + ) + + path = result.stdout.splitlines()[0] + except subprocess.CalledProcessError as exc: + if not ( + exc.returncode == 128 and "fatal: not a git repository" in exc.stderr + ): + _logger.warning( + "Failed to guess project directory using git: %s", + exc.stderr.rstrip("\n"), + ) + except FileNotFoundError as exc: + _logger.warning("Failed to locate command: %s", exc) + + if path is None: + path = os.getcwd() + + _logger.info( + "Guessed %s as project root directory", + path, + ) + + return path + + +def expand_dirs_in_lintables(lintables: set[Lintable]) -> None: + """Return all recognized lintables within given directory.""" + should_expand = False + + for item in lintables: + if item.path.is_dir(): + should_expand = True + break + + if should_expand: + # this relies on git and we do not want to call unless needed + all_files = discover_lintables(options) + + for item in copy.copy(lintables): + if item.path.is_dir(): + for filename in all_files: + if filename.startswith(str(item.path)): + lintables.add(Lintable(filename)) |