summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/file_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/file_utils.py')
-rw-r--r--src/ansiblelint/file_utils.py523
1 files changed, 523 insertions, 0 deletions
diff --git a/src/ansiblelint/file_utils.py b/src/ansiblelint/file_utils.py
new file mode 100644
index 0000000..500dff7
--- /dev/null
+++ b/src/ansiblelint/file_utils.py
@@ -0,0 +1,523 @@
+"""Utility functions related to file operations."""
+from __future__ import annotations
+
+import copy
+import logging
+import os
+import pathlib
+import subprocess
+import sys
+from argparse import Namespace
+from collections import OrderedDict, defaultdict
+from contextlib import contextmanager
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Any, Iterator, cast
+
+import wcmatch.pathlib
+from wcmatch.wcmatch import RECURSIVE, WcMatch
+from yaml.error import YAMLError
+
+from ansiblelint.config import BASE_KINDS, options
+from ansiblelint.constants import GIT_CMD, FileType, States
+
+if TYPE_CHECKING:
+ # https://github.com/PyCQA/pylint/issues/3979
+ BasePathLike = os.PathLike[Any] # pylint: disable=unsubscriptable-object
+else:
+ BasePathLike = os.PathLike
+
+_logger = logging.getLogger(__package__)
+
+
+def abspath(path: str, base_dir: str) -> str:
+ """Make relative path absolute relative to given directory.
+
+ Args:
+ path (str): the path to make absolute
+ base_dir (str): the directory from which make \
+ relative paths absolute
+ """
+ if not os.path.isabs(path):
+ # Don't use abspath as it assumes path is relative to cwd.
+ # We want it relative to base_dir.
+ path = os.path.join(base_dir, path)
+
+ return os.path.normpath(path)
+
+
+def normpath(path: str | BasePathLike) -> str:
+ """
+ Normalize a path in order to provide a more consistent output.
+
+ Currently it generates a relative path but in the future we may want to
+ make this user configurable.
+ """
+ # prevent possible ValueError with relpath(), when input is an empty string
+ if not path:
+ path = "."
+ # conversion to string in order to allow receiving non string objects
+ relpath = os.path.relpath(str(path))
+ path_absolute = os.path.abspath(str(path))
+ if path_absolute.startswith(os.getcwd()):
+ return relpath
+ if path_absolute.startswith(os.path.expanduser("~")):
+ return path_absolute.replace(os.path.expanduser("~"), "~")
+ # we avoid returning relative paths that end-up at root level
+ if path_absolute in relpath:
+ return path_absolute
+ if relpath.startswith("../"):
+ return path_absolute
+ return relpath
+
+
+# That is needed for compatibility with py38, later was added to Path class
+def is_relative_to(path: Path, *other: Any) -> bool:
+ """Return True if the path is relative to another path or False."""
+ try:
+ path.resolve().absolute().relative_to(*other)
+ return True
+ except ValueError:
+ return False
+
+
+def normpath_path(path: str | BasePathLike) -> Path:
+ """Normalize a path in order to provide a more consistent output.
+
+ - Any symlinks are resolved.
+ - Any paths outside the CWD are resolved to their absolute path.
+ - Any absolute path within current user home directory is compressed to
+ make use of '~', so it is easier to read and more portable.
+ """
+ if not isinstance(path, Path):
+ path = Path(path)
+
+ is_relative = is_relative_to(path, path.cwd())
+ path = path.resolve()
+ if is_relative:
+ path = path.relative_to(path.cwd())
+
+ # Compress any absolute path within current user home directory
+ if path.is_absolute():
+ home = Path.home()
+ if is_relative_to(path, home):
+ path = Path("~") / path.relative_to(home)
+
+ return path
+
+
+@contextmanager
+def cwd(path: str | BasePathLike) -> Iterator[None]:
+ """Context manager for temporary changing current working directory."""
+ old_pwd = os.getcwd()
+ os.chdir(path)
+ try:
+ yield
+ finally:
+ os.chdir(old_pwd)
+
+
+def expand_path_vars(path: str) -> str:
+ """Expand the environment or ~ variables in a path string."""
+ # It may be possible for function to be called with a Path object
+ path = str(path).strip()
+ path = os.path.expanduser(path)
+ path = os.path.expandvars(path)
+ return path
+
+
+def expand_paths_vars(paths: list[str]) -> list[str]:
+ """Expand the environment or ~ variables in a list."""
+ paths = [expand_path_vars(p) for p in paths]
+ return paths
+
+
+def kind_from_path(path: Path, base: bool = False) -> FileType:
+ """Determine the file kind based on its name.
+
+ When called with base=True, it will return the base file type instead
+ of the explicit one. That is expected to return 'yaml' for any yaml files.
+ """
+ # pathlib.Path.match patterns are very limited, they do not support *a*.yml
+ # glob.glob supports **/foo.yml but not multiple extensions
+ pathex = wcmatch.pathlib.PurePath(str(path.absolute().resolve()))
+ kinds = options.kinds if not base else BASE_KINDS
+ for entry in kinds:
+ for k, v in entry.items():
+ if pathex.globmatch(
+ v,
+ flags=(
+ wcmatch.pathlib.GLOBSTAR
+ | wcmatch.pathlib.BRACE
+ | wcmatch.pathlib.DOTGLOB
+ ),
+ ):
+ return str(k) # type: ignore
+
+ if base:
+ # Unknown base file type is default
+ return ""
+
+ if path.is_dir():
+ return "role"
+
+ if str(path) == "/dev/stdin":
+ return "playbook"
+
+ # Unknown file types report a empty string (evaluated as False)
+ return ""
+
+
+# pylint: disable=too-many-instance-attributes
+class Lintable:
+ """Defines a file/folder that can be linted.
+
+ Providing file content when creating the object allow creation of in-memory
+ instances that do not need files to be present on disk.
+
+ When symlinks are given, they will always be resolved to their target.
+ """
+
+ def __init__(
+ self,
+ name: str | Path,
+ content: str | None = None,
+ kind: FileType | None = None,
+ base_kind: str = "",
+ ):
+ """Create a Lintable instance."""
+ self.dir: str = ""
+ self.kind: FileType | None = None
+ self.stop_processing = False # Set to stop other rules from running
+ self._data: Any = States.NOT_LOADED
+ self.line_skips: dict[int, set[str]] = defaultdict(set)
+ self.exc: Exception | None = None # Stores data loading exceptions
+
+ if isinstance(name, str):
+ name = Path(name)
+ is_relative = is_relative_to(name, str(name.cwd()))
+ name = name.resolve()
+ if is_relative:
+ name = name.relative_to(name.cwd())
+ name = normpath_path(name)
+ self.path = name
+ # Filename is effective file on disk, for stdin is a namedtempfile
+ self.name = self.filename = str(name)
+
+ self._content = self._original_content = content
+ self.updated = False
+
+ # if the lintable is part of a role, we save role folder name
+ self.role = ""
+ parts = self.path.parent.parts
+ if "roles" in parts:
+ role = self.path
+ while role.parent.name != "roles" and role.name:
+ role = role.parent
+ if role.exists():
+ self.role = role.name
+
+ if str(self.path) in ["/dev/stdin", "-"]:
+ # pylint: disable=consider-using-with
+ self.file = NamedTemporaryFile(mode="w+", suffix="playbook.yml")
+ self.filename = self.file.name
+ self._content = sys.stdin.read()
+ self.file.write(self._content)
+ self.file.flush()
+ self.path = Path(self.file.name)
+ self.name = "stdin"
+ self.kind = "playbook"
+ self.dir = "/"
+ else:
+ self.kind = kind or kind_from_path(self.path)
+ # We store absolute directory in dir
+ if not self.dir:
+ if self.kind == "role":
+ self.dir = str(self.path.resolve())
+ else:
+ self.dir = str(self.path.parent.resolve())
+
+ # determine base file kind (yaml, xml, ini, ...)
+ self.base_kind = base_kind or kind_from_path(self.path, base=True)
+ self.abspath = self.path.expanduser().absolute()
+
+ if self.kind == "yaml":
+ self.data # pylint: disable=pointless-statement
+
+ def _guess_kind(self) -> None:
+ if self.kind == "yaml":
+ if isinstance(self.data, list) and "hosts" in self.data[0]:
+ if "rules" not in self.data[0]:
+ self.kind = "playbook"
+ else:
+ self.kind = "rulebook"
+ # we we failed to guess the more specific kind, we warn user
+ if self.kind == "yaml":
+ _logger.debug(
+ "Passed '%s' positional argument was identified as generic '%s' file kind.",
+ self.name,
+ self.kind,
+ )
+
+ def __getitem__(self, key: Any) -> Any:
+ """Provide compatibility subscriptable support."""
+ if key == "path":
+ return str(self.path)
+ if key == "type":
+ return str(self.kind)
+ raise NotImplementedError()
+
+ def get(self, key: Any, default: Any = None) -> Any:
+ """Provide compatibility subscriptable support."""
+ try:
+ return self[key]
+ except NotImplementedError:
+ return default
+
+ def _populate_content_cache_from_disk(self) -> None:
+ # Can raise UnicodeDecodeError
+ try:
+ self._content = self.path.expanduser().resolve().read_text(encoding="utf-8")
+ except FileNotFoundError as ex:
+ if vars(options).get("progressive"):
+ self._content = ""
+ else:
+ raise ex
+ if self._original_content is None:
+ self._original_content = self._content
+
+ @property
+ def content(self) -> str:
+ """Retrieve file content, from internal cache or disk."""
+ if self._content is None:
+ self._populate_content_cache_from_disk()
+ return cast(str, self._content)
+
+ @content.setter
+ def content(self, value: str) -> None:
+ """Update ``content`` and calculate ``updated``.
+
+ To calculate ``updated`` this will read the file from disk if the cache
+ has not already been populated.
+ """
+ if not isinstance(value, str):
+ raise TypeError(f"Expected str but got {type(value)}")
+ if self._original_content is None:
+ if self._content is not None:
+ self._original_content = self._content
+ elif self.path.exists():
+ self._populate_content_cache_from_disk()
+ else:
+ # new file
+ self._original_content = ""
+ self.updated = self._original_content != value
+ self._content = value
+
+ @content.deleter
+ def content(self) -> None:
+ """Reset the internal content cache."""
+ self._content = None
+
+ def write(self, force: bool = False) -> None:
+ """Write the value of ``Lintable.content`` to disk.
+
+ This only writes to disk if the content has been updated (``Lintable.updated``).
+ For example, you can update the content, and then write it to disk like this:
+
+ .. code:: python
+
+ lintable.content = new_content
+ lintable.write()
+
+ Use ``force=True`` when you want to force a content rewrite even if the
+ content has not changed. For example:
+
+ .. code:: python
+
+ lintable.write(force=True)
+ """
+ if not force and not self.updated:
+ # No changes to write.
+ return
+ self.path.expanduser().resolve().write_text(
+ self._content or "", encoding="utf-8"
+ )
+
+ def __hash__(self) -> int:
+ """Return a hash value of the lintables."""
+ return hash((self.name, self.kind, self.abspath))
+
+ def __eq__(self, other: object) -> bool:
+ """Identify whether the other object represents the same rule match."""
+ if isinstance(other, Lintable):
+ return bool(self.name == other.name and self.kind == other.kind)
+ return False
+
+ def __repr__(self) -> str:
+ """Return user friendly representation of a lintable."""
+ return f"{self.name} ({self.kind})"
+
+ @property
+ def data(self) -> Any:
+ """Return loaded data representation for current file, if possible."""
+ if self._data == States.NOT_LOADED:
+ if self.path.is_dir():
+ self._data = None
+ return self._data
+ try:
+ if str(self.base_kind) == "text/yaml":
+ from ansiblelint.utils import ( # pylint: disable=import-outside-toplevel
+ parse_yaml_linenumbers,
+ )
+
+ self._data = parse_yaml_linenumbers(self)
+ # now that _data is not empty, we can try guessing if playbook or rulebook
+ # it has to be done before append_skipped_rules() call as it's relying
+ # on self.kind.
+ if self.kind == "yaml":
+ self._guess_kind()
+ # Lazy import to avoid delays and cyclic-imports
+ if "append_skipped_rules" not in globals():
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.skip_utils import append_skipped_rules
+
+ self._data = append_skipped_rules(self._data, self)
+ else:
+ logging.debug(
+ "data set to None for %s due to being of %s kind.",
+ self.path,
+ self.base_kind,
+ )
+ self._data = States.UNKNOWN_DATA
+
+ except (RuntimeError, FileNotFoundError, YAMLError) as exc:
+ self._data = States.LOAD_FAILED
+ self.exc = exc
+ return self._data
+
+
+# pylint: disable=redefined-outer-name
+def discover_lintables(options: Namespace) -> dict[str, Any]:
+ """Find all files that we know how to lint.
+
+ Return format is normalized, relative for stuff below cwd, ~/ for content
+ under current user and absolute for everything else.
+ """
+ # git is preferred as it also considers .gitignore
+ git_command_present = [
+ *GIT_CMD,
+ "ls-files",
+ "--cached",
+ "--others",
+ "--exclude-standard",
+ "-z",
+ ]
+ git_command_absent = [*GIT_CMD, "ls-files", "--deleted", "-z"]
+ out = None
+
+ try:
+ out_present = subprocess.check_output(
+ git_command_present, stderr=subprocess.STDOUT, text=True
+ ).split("\x00")[:-1]
+ _logger.info(
+ "Discovered files to lint using: %s", " ".join(git_command_present)
+ )
+
+ out_absent = subprocess.check_output(
+ git_command_absent, stderr=subprocess.STDOUT, text=True
+ ).split("\x00")[:-1]
+ _logger.info("Excluded removed files using: %s", " ".join(git_command_absent))
+
+ out = set(out_present) - set(out_absent)
+ except subprocess.CalledProcessError as exc:
+ if not (exc.returncode == 128 and "fatal: not a git repository" in exc.output):
+ _logger.warning(
+ "Failed to discover lintable files using git: %s",
+ exc.output.rstrip("\n"),
+ )
+ except FileNotFoundError as exc:
+ if options.verbosity:
+ _logger.warning("Failed to locate command: %s", exc)
+
+ if out is None:
+ exclude_pattern = "|".join(str(x) for x in options.exclude_paths)
+ _logger.info("Looking up for files, excluding %s ...", exclude_pattern)
+ # remove './' prefix from output of WcMatch
+ out = {
+ strip_dotslash_prefix(fname)
+ for fname in WcMatch(
+ ".", exclude_pattern=exclude_pattern, flags=RECURSIVE, limit=256
+ ).match()
+ }
+
+ return OrderedDict.fromkeys(sorted(out))
+
+
+def strip_dotslash_prefix(fname: str) -> str:
+ """Remove ./ leading from filenames."""
+ return fname[2:] if fname.startswith("./") else fname
+
+
+def guess_project_dir(config_file: str | None) -> str:
+ """Return detected project dir or current working directory."""
+ path = None
+ if config_file is not None and config_file != "/dev/null":
+ target = pathlib.Path(config_file)
+ if target.exists():
+ # for config inside .config, we return the parent dir as project dir
+ cfg_path = target.parent
+ if cfg_path.parts[-1] == ".config":
+ path = str(cfg_path.parent.absolute())
+ else:
+ path = str(cfg_path.absolute())
+
+ if path is None:
+ try:
+ result = subprocess.run(
+ [*GIT_CMD, "rev-parse", "--show-toplevel"],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+
+ path = result.stdout.splitlines()[0]
+ except subprocess.CalledProcessError as exc:
+ if not (
+ exc.returncode == 128 and "fatal: not a git repository" in exc.stderr
+ ):
+ _logger.warning(
+ "Failed to guess project directory using git: %s",
+ exc.stderr.rstrip("\n"),
+ )
+ except FileNotFoundError as exc:
+ _logger.warning("Failed to locate command: %s", exc)
+
+ if path is None:
+ path = os.getcwd()
+
+ _logger.info(
+ "Guessed %s as project root directory",
+ path,
+ )
+
+ return path
+
+
+def expand_dirs_in_lintables(lintables: set[Lintable]) -> None:
+ """Return all recognized lintables within given directory."""
+ should_expand = False
+
+ for item in lintables:
+ if item.path.is_dir():
+ should_expand = True
+ break
+
+ if should_expand:
+ # this relies on git and we do not want to call unless needed
+ all_files = discover_lintables(options)
+
+ for item in copy.copy(lintables):
+ if item.path.is_dir():
+ for filename in all_files:
+ if filename.startswith(str(item.path)):
+ lintables.add(Lintable(filename))