diff options
Diffstat (limited to 'testing/web-platform/tests/tools/manifest/vcs.py')
-rw-r--r-- | testing/web-platform/tests/tools/manifest/vcs.py | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/manifest/vcs.py b/testing/web-platform/tests/tools/manifest/vcs.py new file mode 100644 index 0000000000..7b6b73d877 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/vcs.py @@ -0,0 +1,305 @@ +import abc +import os +import stat +from collections import deque +from os import stat_result +from typing import (Any, Dict, Iterable, Iterator, List, MutableMapping, Optional, Set, Text, Tuple, + TYPE_CHECKING) + +from . import jsonlib +from .utils import git + +# Cannot do `from ..gitignore import gitignore` because +# relative import beyond toplevel throws *ImportError*! +from gitignore import gitignore # type: ignore + + +if TYPE_CHECKING: + from .manifest import Manifest # avoid cyclic import + +GitIgnoreCacheType = MutableMapping[bytes, bool] + + +def get_tree(tests_root: Text, + manifest: "Manifest", + manifest_path: Optional[Text], + cache_root: Optional[Text], + working_copy: bool = True, + rebuild: bool = False) -> "FileSystem": + tree = None + if cache_root is None: + cache_root = os.path.join(tests_root, ".wptcache") + if not os.path.exists(cache_root): + try: + os.makedirs(cache_root) + except OSError: + cache_root = None + + if not working_copy: + raise ValueError("working_copy=False unsupported") + + if tree is None: + tree = FileSystem(tests_root, + manifest.url_base, + manifest_path=manifest_path, + cache_path=cache_root, + rebuild=rebuild) + return tree + + +class GitHasher: + def __init__(self, path: Text) -> None: + self.git = git(path) + + def _local_changes(self) -> Set[Text]: + """get a set of files which have changed between HEAD and working copy""" + assert self.git is not None + # note that git runs the command with tests_root as the cwd, which may + # not be the root of the git repo (e.g., within a browser repo) + # + # `git diff-index --relative` without a path still compares all tracked + # files before non-WPT files are filtered out, which can be slow in + # vendor repos. Explicitly pass the CWD (i.e., `tests_root`) as a path + # argument to avoid unnecessary diffing. + cmd = ["diff-index", "--relative", "--no-renames", "--name-only", "-z", "HEAD", os.curdir] + data = self.git(*cmd) + return set(data.split("\0")) + + def hash_cache(self) -> Dict[Text, Optional[Text]]: + """ + A dict of rel_path -> current git object id if the working tree matches HEAD else None + """ + hash_cache: Dict[Text, Optional[Text]] = {} + + if self.git is None: + return hash_cache + + # note that git runs the command with tests_root as the cwd, which may + # not be the root of the git repo (e.g., within a browser repo) + cmd = ["ls-tree", "-r", "-z", "HEAD"] + local_changes = self._local_changes() + for result in self.git(*cmd).split("\0")[:-1]: # type: Text + data, rel_path = result.rsplit("\t", 1) + hash_cache[rel_path] = None if rel_path in local_changes else data.split(" ", 3)[2] + + return hash_cache + + + +class FileSystem: + def __init__(self, + tests_root: Text, + url_base: Text, + cache_path: Optional[Text], + manifest_path: Optional[Text] = None, + rebuild: bool = False) -> None: + self.tests_root = tests_root + self.url_base = url_base + self.ignore_cache = None + self.mtime_cache = None + tests_root_bytes = tests_root.encode("utf8") + if cache_path is not None: + if manifest_path is not None: + self.mtime_cache = MtimeCache(cache_path, tests_root, manifest_path, rebuild) + if gitignore.has_ignore(tests_root_bytes): + self.ignore_cache = GitIgnoreCache(cache_path, tests_root, rebuild) + self.path_filter = gitignore.PathFilter(tests_root_bytes, + extras=[b".git/"], + cache=self.ignore_cache) + git = GitHasher(tests_root) + self.hash_cache = git.hash_cache() + + def __iter__(self) -> Iterator[Tuple[Text, Optional[Text], bool]]: + mtime_cache = self.mtime_cache + for dirpath, dirnames, filenames in self.path_filter( + walk(self.tests_root.encode("utf8"))): + for filename, path_stat in filenames: + path = os.path.join(dirpath, filename).decode("utf8") + if mtime_cache is None or mtime_cache.updated(path, path_stat): + file_hash = self.hash_cache.get(path, None) + yield path, file_hash, True + else: + yield path, None, False + + def dump_caches(self) -> None: + for cache in [self.mtime_cache, self.ignore_cache]: + if cache is not None: + cache.dump() + + +class CacheFile(metaclass=abc.ABCMeta): + def __init__(self, cache_root: Text, tests_root: Text, rebuild: bool = False) -> None: + self.tests_root = tests_root + if not os.path.exists(cache_root): + os.makedirs(cache_root) + self.path = os.path.join(cache_root, self.file_name) + self.modified = False + self.data = self.load(rebuild) + + @abc.abstractproperty + def file_name(self) -> Text: + pass + + def dump(self) -> None: + if not self.modified: + return + with open(self.path, 'w') as f: + jsonlib.dump_local(self.data, f) + + def load(self, rebuild: bool = False) -> Dict[Text, Any]: + data: Dict[Text, Any] = {} + try: + if not rebuild: + with open(self.path) as f: + try: + data = jsonlib.load(f) + except ValueError: + pass + data = self.check_valid(data) + except OSError: + pass + return data + + def check_valid(self, data: Dict[Text, Any]) -> Dict[Text, Any]: + """Check if the cached data is valid and return an updated copy of the + cache containing only data that can be used.""" + return data + + +class MtimeCache(CacheFile): + file_name = "mtime.json" + + def __init__(self, cache_root: Text, tests_root: Text, manifest_path: Text, rebuild: bool = False) -> None: + self.manifest_path = manifest_path + super().__init__(cache_root, tests_root, rebuild) + + def updated(self, rel_path: Text, stat: stat_result) -> bool: + """Return a boolean indicating whether the file changed since the cache was last updated. + + This implicitly updates the cache with the new mtime data.""" + mtime = stat.st_mtime + if mtime != self.data.get(rel_path): + self.modified = True + self.data[rel_path] = mtime + return True + return False + + def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]: + if data.get("/tests_root") != self.tests_root: + self.modified = True + else: + if self.manifest_path is not None and os.path.exists(self.manifest_path): + mtime = os.path.getmtime(self.manifest_path) + if data.get("/manifest_path") != [self.manifest_path, mtime]: + self.modified = True + else: + self.modified = True + if self.modified: + data = {} + data["/tests_root"] = self.tests_root + return data + + def dump(self) -> None: + if self.manifest_path is None: + raise ValueError + if not os.path.exists(self.manifest_path): + return + mtime = os.path.getmtime(self.manifest_path) + self.data["/manifest_path"] = [self.manifest_path, mtime] + self.data["/tests_root"] = self.tests_root + super().dump() + + +class GitIgnoreCache(CacheFile, GitIgnoreCacheType): + file_name = "gitignore2.json" + + def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]: + ignore_path = os.path.join(self.tests_root, ".gitignore") + mtime = os.path.getmtime(ignore_path) + if data.get("/gitignore_file") != [ignore_path, mtime]: + self.modified = True + data = {} + data["/gitignore_file"] = [ignore_path, mtime] + return data + + def __contains__(self, key: Any) -> bool: + try: + key = key.decode("utf-8") + except Exception: + return False + + return key in self.data + + def __getitem__(self, key: bytes) -> bool: + real_key = key.decode("utf-8") + v = self.data[real_key] + assert isinstance(v, bool) + return v + + def __setitem__(self, key: bytes, value: bool) -> None: + real_key = key.decode("utf-8") + if self.data.get(real_key) != value: + self.modified = True + self.data[real_key] = value + + def __delitem__(self, key: bytes) -> None: + real_key = key.decode("utf-8") + del self.data[real_key] + + def __iter__(self) -> Iterator[bytes]: + return (key.encode("utf-8") for key in self.data) + + def __len__(self) -> int: + return len(self.data) + + +def walk(root: bytes) -> Iterable[Tuple[bytes, List[Tuple[bytes, stat_result]], List[Tuple[bytes, stat_result]]]]: + """Re-implementation of os.walk. Returns an iterator over + (dirpath, dirnames, filenames), with some semantic differences + to os.walk. + + This has a similar interface to os.walk, with the important difference + that instead of lists of filenames and directory names, it yields + lists of tuples of the form [(name, stat)] where stat is the result of + os.stat for the file. That allows reusing the same stat data in the + caller. It also always returns the dirpath relative to the root, with + the root iself being returned as the empty string. + + Unlike os.walk the implementation is not recursive.""" + + get_stat = os.stat + is_dir = stat.S_ISDIR + is_link = stat.S_ISLNK + join = os.path.join + listdir = os.listdir + relpath = os.path.relpath + + root = os.path.abspath(root) + stack = deque([(root, b"")]) + + while stack: + dir_path, rel_path = stack.popleft() + try: + # Note that listdir and error are globals in this module due + # to earlier import-*. + names = listdir(dir_path) + except OSError: + continue + + dirs, non_dirs = [], [] + for name in names: + path = join(dir_path, name) + try: + path_stat = get_stat(path) + except OSError: + continue + if is_dir(path_stat.st_mode): + dirs.append((name, path_stat)) + else: + non_dirs.append((name, path_stat)) + + yield rel_path, dirs, non_dirs + for name, path_stat in dirs: + new_path = join(dir_path, name) + if not is_link(path_stat.st_mode): + stack.append((new_path, relpath(new_path, root))) |