summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/file_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/file_utils.py')
-rw-r--r--src/ansiblelint/file_utils.py584
1 files changed, 584 insertions, 0 deletions
diff --git a/src/ansiblelint/file_utils.py b/src/ansiblelint/file_utils.py
new file mode 100644
index 0000000..15c92d2
--- /dev/null
+++ b/src/ansiblelint/file_utils.py
@@ -0,0 +1,584 @@
+"""Utility functions related to file operations."""
+from __future__ import annotations
+
+import copy
+import logging
+import os
+import sys
+from collections import defaultdict
+from contextlib import contextmanager
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Any, cast
+
+import pathspec
+import wcmatch.pathlib
+import wcmatch.wcmatch
+from yaml.error import YAMLError
+
+from ansiblelint.config import BASE_KINDS, Options, options
+from ansiblelint.constants import CONFIG_FILENAMES, FileType, States
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator, Sequence
+
+
+_logger = logging.getLogger(__package__)
+
+
+def abspath(path: str, base_dir: str) -> str:
+ """Make relative path absolute relative to given directory.
+
+ path (str): the path to make absolute
+ base_dir (str): the directory from which make relative paths absolute.
+ """
+ if not os.path.isabs(path):
+ # Don't use abspath as it assumes path is relative to cwd.
+ # We want it relative to base_dir.
+ path = os.path.join(base_dir, path)
+
+ return os.path.normpath(path)
+
+
+def normpath(path: str | Path) -> str:
+ """Normalize a path in order to provide a more consistent output.
+
+ Currently it generates a relative path but in the future we may want to
+ make this user configurable.
+ """
+ # prevent possible ValueError with relpath(), when input is an empty string
+ if not path:
+ path = "."
+ # conversion to string in order to allow receiving non string objects
+ relpath = os.path.relpath(str(path))
+ path_absolute = os.path.abspath(str(path))
+ if path_absolute.startswith(os.getcwd()):
+ return relpath
+ if path_absolute.startswith(os.path.expanduser("~")):
+ return path_absolute.replace(os.path.expanduser("~"), "~")
+ # we avoid returning relative paths that end-up at root level
+ if path_absolute in relpath:
+ return path_absolute
+ if relpath.startswith("../"):
+ return path_absolute
+ return relpath
+
+
+# That is needed for compatibility with py38, later was added to Path class
+def is_relative_to(path: Path, *other: Any) -> bool:
+ """Return True if the path is relative to another path or False."""
+ try:
+ path.resolve().absolute().relative_to(*other)
+ return True
+ except ValueError:
+ return False
+
+
+def normpath_path(path: str | Path) -> Path:
+ """Normalize a path in order to provide a more consistent output.
+
+ - Any symlinks are resolved.
+ - Any paths outside the CWD are resolved to their absolute path.
+ - Any absolute path within current user home directory is compressed to
+ make use of '~', so it is easier to read and more portable.
+ """
+ if not isinstance(path, Path):
+ path = Path(path)
+
+ is_relative = is_relative_to(path, path.cwd())
+ path = path.resolve()
+ if is_relative:
+ path = path.relative_to(path.cwd())
+
+ # Compress any absolute path within current user home directory
+ if path.is_absolute():
+ home = Path.home()
+ if is_relative_to(path, home):
+ path = Path("~") / path.relative_to(home)
+
+ return path
+
+
+@contextmanager
+def cwd(path: Path) -> Iterator[None]:
+ """Context manager for temporary changing current working directory."""
+ old_pwd = Path.cwd()
+ os.chdir(path)
+ try:
+ yield
+ finally:
+ os.chdir(old_pwd)
+
+
+def expand_path_vars(path: str) -> str:
+ """Expand the environment or ~ variables in a path string."""
+ # It may be possible for function to be called with a Path object
+ path = str(path).strip()
+ path = os.path.expanduser(path)
+ path = os.path.expandvars(path)
+ return path
+
+
+def expand_paths_vars(paths: list[str]) -> list[str]:
+ """Expand the environment or ~ variables in a list."""
+ paths = [expand_path_vars(p) for p in paths]
+ return paths
+
+
+def kind_from_path(path: Path, *, base: bool = False) -> FileType:
+ """Determine the file kind based on its name.
+
+ When called with base=True, it will return the base file type instead
+ of the explicit one. That is expected to return 'yaml' for any yaml files.
+ """
+ # pathlib.Path.match patterns are very limited, they do not support *a*.yml
+ # glob.glob supports **/foo.yml but not multiple extensions
+ pathex = wcmatch.pathlib.PurePath(str(path.absolute().resolve()))
+ kinds = options.kinds if not base else BASE_KINDS
+ for entry in kinds:
+ for k, v in entry.items():
+ if pathex.globmatch(
+ v,
+ flags=(
+ wcmatch.pathlib.GLOBSTAR
+ | wcmatch.pathlib.BRACE
+ | wcmatch.pathlib.DOTGLOB
+ ),
+ ):
+ return str(k) # type: ignore[return-value]
+
+ if base:
+ # Unknown base file type is default
+ return ""
+
+ if path.is_dir():
+ known_role_subfolders = ("tasks", "meta", "vars", "defaults", "handlers")
+ for filename in known_role_subfolders:
+ if (path / filename).is_dir():
+ return "role"
+ _logger.debug(
+ "Folder `%s` does not look like a role due to missing any of the common subfolders such: %s.",
+ path,
+ ", ".join(known_role_subfolders),
+ )
+
+ if str(path) == "/dev/stdin":
+ return "playbook"
+
+ # Unknown file types report a empty string (evaluated as False)
+ return ""
+
+
+# pylint: disable=too-many-instance-attributes
+class Lintable:
+ """Defines a file/folder that can be linted.
+
+ Providing file content when creating the object allow creation of in-memory
+ instances that do not need files to be present on disk.
+
+ When symlinks are given, they will always be resolved to their target.
+ """
+
+ # pylint: disable=too-many-arguments
+ def __init__(
+ self,
+ name: str | Path,
+ content: str | None = None,
+ kind: FileType | None = None,
+ base_kind: str = "",
+ parent: Lintable | None = None,
+ ):
+ """Create a Lintable instance."""
+ self.dir: str = ""
+ self.kind: FileType | None = None
+ self.stop_processing = False # Set to stop other rules from running
+ self.state: Any = States.NOT_LOADED
+ self.line_skips: dict[int, set[str]] = defaultdict(set)
+ self.exc: Exception | None = None # Stores data loading exceptions
+ self.parent = parent
+ self.explicit = False # Indicates if the file was explicitly provided or was indirectly included.
+
+ if isinstance(name, str):
+ name = Path(name)
+ is_relative = is_relative_to(name, str(name.cwd()))
+ name = name.resolve()
+ if is_relative:
+ name = name.relative_to(name.cwd())
+ name = normpath_path(name)
+ # we need to be sure that we expanduser() because otherwise a simple
+ # test like .path.exists() will return unexpected results.
+ self.path = name.expanduser()
+ # Filename is effective file on disk, for stdin is a namedtempfile
+ self.name = self.filename = str(name)
+
+ self._content = self._original_content = content
+ self.updated = False
+
+ # if the lintable is part of a role, we save role folder name
+ self.role = ""
+ parts = self.path.parent.parts
+ if "roles" in parts:
+ role = self.path
+ while role.parent.name != "roles" and role.name:
+ role = role.parent
+ if role.exists():
+ self.role = role.name
+
+ if str(self.path) in ["/dev/stdin", "-"]:
+ # pylint: disable=consider-using-with
+ self.file = NamedTemporaryFile(mode="w+", suffix="playbook.yml")
+ self.filename = self.file.name
+ self._content = sys.stdin.read()
+ self.file.write(self._content)
+ self.file.flush()
+ self.path = Path(self.file.name)
+ self.name = "stdin"
+ self.kind = "playbook"
+ self.dir = "/"
+ else:
+ self.kind = kind or kind_from_path(self.path)
+ # We store absolute directory in dir
+ if not self.dir:
+ if self.kind == "role":
+ self.dir = str(self.path.resolve())
+ else:
+ self.dir = str(self.path.parent.resolve())
+
+ # determine base file kind (yaml, xml, ini, ...)
+ self.base_kind = base_kind or kind_from_path(self.path, base=True)
+ self.abspath = self.path.expanduser().absolute()
+
+ if self.kind == "tasks":
+ self.parent = _guess_parent(self)
+
+ if self.kind == "yaml":
+ _ = self.data # pylint: disable=pointless-statement
+
+ def _guess_kind(self) -> None:
+ if self.kind == "yaml":
+ if (
+ isinstance(self.data, list)
+ and len(self.data) > 0
+ and (
+ "hosts" in self.data[0]
+ or "import_playbook" in self.data[0]
+ or "ansible.builtin.import_playbook" in self.data[0]
+ )
+ ):
+ if "rules" not in self.data[0]:
+ self.kind = "playbook"
+ else:
+ self.kind = "rulebook"
+ # we we failed to guess the more specific kind, we warn user
+ if self.kind == "yaml":
+ _logger.debug(
+ "Passed '%s' positional argument was identified as generic '%s' file kind.",
+ self.name,
+ self.kind,
+ )
+
+ def __getitem__(self, key: Any) -> Any:
+ """Provide compatibility subscriptable support."""
+ if key == "path":
+ return str(self.path)
+ if key == "type":
+ return str(self.kind)
+ raise NotImplementedError
+
+ def get(self, key: Any, default: Any = None) -> Any:
+ """Provide compatibility subscriptable support."""
+ try:
+ return self[key]
+ except NotImplementedError:
+ return default
+
+ def _populate_content_cache_from_disk(self) -> None:
+ # Can raise UnicodeDecodeError
+ self._content = self.path.expanduser().resolve().read_text(encoding="utf-8")
+
+ if self._original_content is None:
+ self._original_content = self._content
+
+ @property
+ def content(self) -> str:
+ """Retrieve file content, from internal cache or disk."""
+ if self._content is None:
+ self._populate_content_cache_from_disk()
+ return cast(str, self._content)
+
+ @content.setter
+ def content(self, value: str) -> None:
+ """Update ``content`` and calculate ``updated``.
+
+ To calculate ``updated`` this will read the file from disk if the cache
+ has not already been populated.
+ """
+ if not isinstance(value, str):
+ msg = f"Expected str but got {type(value)}"
+ raise TypeError(msg)
+ if self._original_content is None:
+ if self._content is not None:
+ self._original_content = self._content
+ elif self.path.exists():
+ self._populate_content_cache_from_disk()
+ else:
+ # new file
+ self._original_content = ""
+ self.updated = self._original_content != value
+ self._content = value
+
+ @content.deleter
+ def content(self) -> None:
+ """Reset the internal content cache."""
+ self._content = None
+
+ def write(self, *, force: bool = False) -> None:
+ """Write the value of ``Lintable.content`` to disk.
+
+ This only writes to disk if the content has been updated (``Lintable.updated``).
+ For example, you can update the content, and then write it to disk like this:
+
+ .. code:: python
+
+ lintable.content = new_content
+ lintable.write()
+
+ Use ``force=True`` when you want to force a content rewrite even if the
+ content has not changed. For example:
+
+ .. code:: python
+
+ lintable.write(force=True)
+ """
+ if not force and not self.updated:
+ # No changes to write.
+ return
+ self.path.expanduser().resolve().write_text(
+ self._content or "",
+ encoding="utf-8",
+ )
+
+ def __hash__(self) -> int:
+ """Return a hash value of the lintables."""
+ return hash((self.name, self.kind, self.abspath))
+
+ def __eq__(self, other: object) -> bool:
+ """Identify whether the other object represents the same rule match."""
+ if isinstance(other, Lintable):
+ return bool(self.name == other.name and self.kind == other.kind)
+ return False
+
+ def __repr__(self) -> str:
+ """Return user friendly representation of a lintable."""
+ return f"{self.name} ({self.kind})"
+
+ @property
+ def data(self) -> Any:
+ """Return loaded data representation for current file, if possible."""
+ if self.state == States.NOT_LOADED:
+ if self.path.is_dir():
+ self.state = None
+ return self.state
+ try:
+ if str(self.base_kind) == "text/yaml":
+ from ansiblelint.utils import ( # pylint: disable=import-outside-toplevel
+ parse_yaml_linenumbers,
+ )
+
+ self.state = parse_yaml_linenumbers(self)
+ # now that _data is not empty, we can try guessing if playbook or rulebook
+ # it has to be done before append_skipped_rules() call as it's relying
+ # on self.kind.
+ if self.kind == "yaml":
+ self._guess_kind()
+ # Lazy import to avoid delays and cyclic-imports
+ if "append_skipped_rules" not in globals():
+ # pylint: disable=import-outside-toplevel
+ from ansiblelint.skip_utils import append_skipped_rules
+
+ self.state = append_skipped_rules(self.state, self)
+ else:
+ logging.debug(
+ "data set to None for %s due to being '%s' (%s) kind.",
+ self.path,
+ self.kind,
+ self.base_kind or "unknown",
+ )
+ self.state = States.UNKNOWN_DATA
+
+ except (
+ RuntimeError,
+ FileNotFoundError,
+ YAMLError,
+ UnicodeDecodeError,
+ ) as exc:
+ self.state = States.LOAD_FAILED
+ self.exc = exc
+ return self.state
+
+
+# pylint: disable=redefined-outer-name
+def discover_lintables(options: Options) -> list[str]:
+ """Find all files that we know how to lint.
+
+ Return format is normalized, relative for stuff below cwd, ~/ for content
+ under current user and absolute for everything else.
+ """
+ if not options.lintables:
+ options.lintables = ["."]
+
+ return [
+ str(filename)
+ for filename in get_all_files(
+ *[Path(s) for s in options.lintables],
+ exclude_paths=options.exclude_paths,
+ )
+ ]
+
+
+def strip_dotslash_prefix(fname: str) -> str:
+ """Remove ./ leading from filenames."""
+ return fname[2:] if fname.startswith("./") else fname
+
+
+def find_project_root(
+ srcs: Sequence[str],
+ config_file: str | None = None,
+) -> tuple[Path, str]:
+ """Return a directory containing .git or ansible-lint config files.
+
+ That directory will be a common parent of all files and directories
+ passed in `srcs`.
+
+ If no directory in the tree contains a marker that would specify it's the
+ project root, the root of the file system is returned.
+
+ Returns a two-tuple with the first element as the project root path and
+ the second element as a string describing the method by which the
+ project root was discovered.
+ """
+ directory = None
+ if not srcs:
+ srcs = [str(Path.cwd().resolve().absolute())]
+ path_srcs = [Path(Path.cwd(), src).resolve() for src in srcs]
+
+ cfg_files = [config_file] if config_file else CONFIG_FILENAMES
+
+ # A list of lists of parents for each 'src'. 'src' is included as a
+ # "parent" of itself if it is a directory
+ src_parents = [
+ list(path.parents) + ([path] if path.is_dir() else []) for path in path_srcs
+ ]
+
+ common_base = max(
+ set.intersection(*(set(parents) for parents in src_parents)),
+ key=lambda path: path.parts,
+ )
+
+ for directory in (common_base, *common_base.parents):
+ if (directory / ".git").exists():
+ return directory, ".git directory"
+
+ if (directory / ".hg").is_dir():
+ return directory, ".hg directory"
+
+ for cfg_file in cfg_files:
+ # note that if cfg_file is already absolute, 'directory' is ignored
+ resolved_cfg_path = directory / cfg_file
+ if resolved_cfg_path.is_file():
+ if os.path.isabs(cfg_file):
+ directory = Path(cfg_file).parent
+ if directory.name == ".config":
+ directory = directory.parent
+ return directory, f"config file {resolved_cfg_path}"
+
+ if not directory:
+ return Path.cwd(), "current working directory"
+ return directory, "file system root"
+
+
+def expand_dirs_in_lintables(lintables: set[Lintable]) -> None:
+ """Return all recognized lintables within given directory."""
+ should_expand = False
+
+ for item in lintables:
+ if item.path.is_dir():
+ should_expand = True
+ break
+
+ if should_expand:
+ # this relies on git and we do not want to call unless needed
+ all_files = discover_lintables(options)
+
+ for item in copy.copy(lintables):
+ if item.path.is_dir():
+ for filename in all_files:
+ if filename.startswith(str(item.path)):
+ lintables.add(Lintable(filename))
+
+
+def _guess_parent(lintable: Lintable) -> Lintable | None:
+ """Return a parent directory for a lintable."""
+ try:
+ if lintable.path.parents[2].name == "roles":
+ # role_name = lintable.parents[1].name
+ return Lintable(lintable.path.parents[1], kind="role")
+ except IndexError:
+ pass
+ return None
+
+
+def get_all_files(
+ *paths: Path,
+ exclude_paths: list[str] | None = None,
+) -> list[Path]:
+ """Recursively retrieve all files from given folders."""
+ all_files: list[Path] = []
+ exclude_paths = [] if exclude_paths is None else exclude_paths
+
+ def is_excluded(path_to_check: Path) -> bool:
+ """Check if a file is exclude by current specs."""
+ return any(
+ spec.match_file(pathspec.util.append_dir_sep(path_to_check))
+ for spec in pathspecs
+ )
+
+ for path in paths:
+ pathspecs = [
+ pathspec.GitIgnoreSpec.from_lines(
+ [
+ ".git",
+ ".tox",
+ ".mypy_cache",
+ "__pycache__",
+ ".DS_Store",
+ ".coverage",
+ ".pytest_cache",
+ ".ruff_cache",
+ *exclude_paths,
+ ],
+ ),
+ ]
+ gitignore = path / ".gitignore"
+ if gitignore.exists():
+ with gitignore.open(encoding="UTF-8") as f:
+ _logger.info("Loading ignores from %s", gitignore)
+ pathspecs.append(
+ pathspec.GitIgnoreSpec.from_lines(f.read().splitlines()),
+ )
+
+ # Iterate over all items in the directory
+ if path.is_file():
+ all_files.append(path)
+ else:
+ for item in sorted(path.iterdir()):
+ if is_excluded(item):
+ _logger.info("Excluded: %s", item)
+ continue
+ if item.is_file():
+ all_files.append(item)
+ # If it's a directory, recursively call the function
+ elif item.is_dir():
+ all_files.extend(get_all_files(item, exclude_paths=exclude_paths))
+
+ return all_files