From 43a97878ce14b72f0981164f87f2e35e14151312 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 11:22:09 +0200 Subject: Adding upstream version 110.0.1. Signed-off-by: Daniel Baumann --- .../web-platform/tests/tools/manifest/XMLParser.py | 151 ++ .../web-platform/tests/tools/manifest/__init__.py | 1 + .../tests/tools/manifest/catalog/xhtml.dtd | 2125 ++++++++++++++++++++ .../tests/tools/manifest/commands.json | 23 + .../web-platform/tests/tools/manifest/download.py | 207 ++ testing/web-platform/tests/tools/manifest/item.py | 385 ++++ .../web-platform/tests/tools/manifest/jsonlib.py | 139 ++ testing/web-platform/tests/tools/manifest/log.py | 11 + .../web-platform/tests/tools/manifest/manifest.py | 449 +++++ .../tests/tools/manifest/requirements.txt | 1 + .../tests/tools/manifest/sourcefile.py | 1144 +++++++++++ .../web-platform/tests/tools/manifest/testpaths.py | 112 ++ .../tests/tools/manifest/tests/__init__.py | 0 .../tests/tools/manifest/tests/test_XMLParser.py | 56 + .../tests/tools/manifest/tests/test_item.py | 160 ++ .../tests/tools/manifest/tests/test_manifest.py | 310 +++ .../tests/tools/manifest/tests/test_sourcefile.py | 911 +++++++++ .../tests/tools/manifest/tests/test_utils.py | 15 + .../web-platform/tests/tools/manifest/typedata.py | 336 ++++ .../web-platform/tests/tools/manifest/update.py | 105 + testing/web-platform/tests/tools/manifest/utils.py | 93 + testing/web-platform/tests/tools/manifest/vcs.py | 319 +++ 22 files changed, 7053 insertions(+) create mode 100644 testing/web-platform/tests/tools/manifest/XMLParser.py create mode 100644 testing/web-platform/tests/tools/manifest/__init__.py create mode 100644 testing/web-platform/tests/tools/manifest/catalog/xhtml.dtd create mode 100644 testing/web-platform/tests/tools/manifest/commands.json create mode 100644 testing/web-platform/tests/tools/manifest/download.py create mode 100644 testing/web-platform/tests/tools/manifest/item.py create mode 100644 testing/web-platform/tests/tools/manifest/jsonlib.py create mode 100644 testing/web-platform/tests/tools/manifest/log.py create mode 100644 testing/web-platform/tests/tools/manifest/manifest.py create mode 100644 testing/web-platform/tests/tools/manifest/requirements.txt create mode 100644 testing/web-platform/tests/tools/manifest/sourcefile.py create mode 100644 testing/web-platform/tests/tools/manifest/testpaths.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/__init__.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/test_XMLParser.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/test_item.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/test_manifest.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/test_sourcefile.py create mode 100644 testing/web-platform/tests/tools/manifest/tests/test_utils.py create mode 100644 testing/web-platform/tests/tools/manifest/typedata.py create mode 100755 testing/web-platform/tests/tools/manifest/update.py create mode 100644 testing/web-platform/tests/tools/manifest/utils.py create mode 100644 testing/web-platform/tests/tools/manifest/vcs.py (limited to 'testing/web-platform/tests/tools/manifest') diff --git a/testing/web-platform/tests/tools/manifest/XMLParser.py b/testing/web-platform/tests/tools/manifest/XMLParser.py new file mode 100644 index 0000000000..689533421d --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/XMLParser.py @@ -0,0 +1,151 @@ +from os.path import dirname, join + +from collections import OrderedDict + +from xml.parsers import expat +import xml.etree.ElementTree as etree # noqa: N813 + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Dict + from typing import List + from typing import Optional + from typing import Text + from typing import Union + +_catalog = join(dirname(__file__), "catalog") + +def _wrap_error(e): + # type: (expat.error) -> etree.ParseError + err = etree.ParseError(e) + err.code = e.code + err.position = e.lineno, e.offset + raise err + +_names = {} # type: Dict[Text, Text] +def _fixname(key): + # type: (Text) -> Text + try: + name = _names[key] + except KeyError: + name = key + if "}" in name: + name = "{" + name + _names[key] = name + return name + + +_undefined_entity_code = expat.errors.codes[expat.errors.XML_ERROR_UNDEFINED_ENTITY] # type: int + + +class XMLParser: + """ + An XML parser with support for XHTML DTDs and all Python-supported encodings + + This implements the API defined by + xml.etree.ElementTree.XMLParser, but supports XHTML DTDs + (therefore allowing XHTML entities) and supports all encodings + Python does, rather than just those supported by expat. + """ + def __init__(self, encoding=None): + # type: (Optional[Text]) -> None + self._parser = expat.ParserCreate(encoding, "}") + self._target = etree.TreeBuilder() + # parser settings + self._parser.buffer_text = True + self._parser.ordered_attributes = True + self._parser.SetParamEntityParsing(expat.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE) + # parser callbacks + self._parser.XmlDeclHandler = self._xml_decl + self._parser.StartElementHandler = self._start + self._parser.EndElementHandler = self._end + self._parser.CharacterDataHandler = self._data + self._parser.ExternalEntityRefHandler = self._external + self._parser.SkippedEntityHandler = self._skipped # type: ignore + # used for our horrible re-encoding hack + self._fed_data = [] # type: Optional[List[bytes]] + self._read_encoding = None # type: Optional[Text] + + def _xml_decl(self, version, encoding, standalone): + # type: (Text, Optional[Text], int) -> None + self._read_encoding = encoding + + def _start(self, tag, attrib_in): + # type: (Text, List[str]) -> etree.Element + assert isinstance(tag, str) + self._fed_data = None + tag = _fixname(tag) + attrib = OrderedDict() # type: Dict[Union[bytes, Text], Union[bytes, Text]] + if attrib_in: + for i in range(0, len(attrib_in), 2): + attrib[_fixname(attrib_in[i])] = attrib_in[i+1] + return self._target.start(tag, attrib) + + def _data(self, text): + # type: (Text) -> None + self._target.data(text) + + def _end(self, tag): + # type: (Text) -> etree.Element + return self._target.end(_fixname(tag)) + + def _external(self, context, base, system_id, public_id): + # type: (Text, Optional[Text], Optional[Text], Optional[Text]) -> bool + if public_id in { + "-//W3C//DTD XHTML 1.0 Transitional//EN", + "-//W3C//DTD XHTML 1.1//EN", + "-//W3C//DTD XHTML 1.0 Strict//EN", + "-//W3C//DTD XHTML 1.0 Frameset//EN", + "-//W3C//DTD XHTML Basic 1.0//EN", + "-//W3C//DTD XHTML 1.1 plus MathML 2.0//EN", + "-//W3C//DTD XHTML 1.1 plus MathML 2.0 plus SVG 1.1//EN", + "-//W3C//DTD MathML 2.0//EN", + "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" + }: + parser = self._parser.ExternalEntityParserCreate(context) + with open(join(_catalog, "xhtml.dtd"), "rb") as fp: + try: + parser.ParseFile(fp) + except expat.error: + return False + + return True + + def _skipped(self, name, is_parameter_entity): + # type: (Text, bool) -> None + err = expat.error("undefined entity %s: line %d, column %d" % + (name, self._parser.ErrorLineNumber, + self._parser.ErrorColumnNumber)) + err.code = _undefined_entity_code + err.lineno = self._parser.ErrorLineNumber + err.offset = self._parser.ErrorColumnNumber + raise err + + def feed(self, data): + # type: (bytes) -> None + if self._fed_data is not None: + self._fed_data.append(data) + try: + self._parser.Parse(data, False) + except expat.error as v: + _wrap_error(v) + except ValueError as e: + if e.args[0] == 'multi-byte encodings are not supported': + assert self._read_encoding is not None + assert self._fed_data is not None + xml = b"".join(self._fed_data).decode(self._read_encoding).encode("utf-8") + new_parser = XMLParser("utf-8") + self._parser = new_parser._parser + self._target = new_parser._target + self._fed_data = None + self.feed(xml) + + def close(self): + # type: () -> etree.Element + try: + self._parser.Parse("", True) + except expat.error as v: + _wrap_error(v) + tree = self._target.close() + return tree diff --git a/testing/web-platform/tests/tools/manifest/__init__.py b/testing/web-platform/tests/tools/manifest/__init__.py new file mode 100644 index 0000000000..8c8f189070 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/__init__.py @@ -0,0 +1 @@ +from . import item, manifest, sourcefile, update # noqa: F401 diff --git a/testing/web-platform/tests/tools/manifest/catalog/xhtml.dtd b/testing/web-platform/tests/tools/manifest/catalog/xhtml.dtd new file mode 100644 index 0000000000..4307b1c2c4 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/catalog/xhtml.dtd @@ -0,0 +1,2125 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/testing/web-platform/tests/tools/manifest/commands.json b/testing/web-platform/tests/tools/manifest/commands.json new file mode 100644 index 0000000000..769675e0ee --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/commands.json @@ -0,0 +1,23 @@ +{ + "manifest": { + "path": "update.py", + "script": "run", + "parser": "create_parser", + "help": "Update the MANIFEST.json file", + "virtualenv": false + }, + "manifest-download": { + "path": "download.py", + "script": "run", + "parser": "create_parser", + "help": "Download recent pregenerated MANIFEST.json file", + "virtualenv": false + }, + "test-paths": { + "path": "testpaths.py", + "script": "run", + "parser": "create_parser", + "help": "Print test paths given a set of test ids", + "virtualenv": false + } +} diff --git a/testing/web-platform/tests/tools/manifest/download.py b/testing/web-platform/tests/tools/manifest/download.py new file mode 100644 index 0000000000..4a8b6fc347 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/download.py @@ -0,0 +1,207 @@ +import argparse +import bz2 +import gzip +import json +import io +import os +from datetime import datetime, timedelta +from urllib.request import urlopen + +try: + import zstandard +except ImportError: + zstandard = None + +from .utils import git + +from . import log + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import Callable + from typing import List + from typing import Optional + from typing import Text + +here = os.path.dirname(__file__) + +wpt_root = os.path.abspath(os.path.join(here, os.pardir, os.pardir)) +logger = log.get_logger() + + +def abs_path(path): + # type: (Text) -> Text + return os.path.abspath(os.path.expanduser(path)) + + +def should_download(manifest_path, rebuild_time=timedelta(days=5)): + # type: (Text, timedelta) -> bool + if not os.path.exists(manifest_path): + return True + mtime = datetime.fromtimestamp(os.path.getmtime(manifest_path)) + if mtime < datetime.now() - rebuild_time: + return True + logger.info("Skipping manifest download because existing file is recent") + return False + + +def merge_pr_tags(repo_root, max_count=50): + # type: (Text, int) -> List[Text] + gitfunc = git(repo_root) + tags = [] # type: List[Text] + if gitfunc is None: + return tags + for line in gitfunc("log", "--format=%D", "--max-count=%s" % max_count).split("\n"): + for ref in line.split(", "): + if ref.startswith("tag: merge_pr_"): + tags.append(ref[5:]) + return tags + + +def score_name(name): + # type: (Text) -> Optional[int] + """Score how much we like each filename, lower wins, None rejects""" + + # Accept both ways of naming the manifest asset, even though + # there's no longer a reason to include the commit sha. + if name.startswith("MANIFEST-") or name.startswith("MANIFEST."): + if zstandard and name.endswith("json.zst"): + return 1 + if name.endswith(".json.bz2"): + return 2 + if name.endswith(".json.gz"): + return 3 + return None + + +def github_url(tags): + # type: (List[Text]) -> Optional[List[Text]] + for tag in tags: + url = "https://api.github.com/repos/web-platform-tests/wpt/releases/tags/%s" % tag + try: + resp = urlopen(url) + except Exception: + logger.warning("Fetching %s failed" % url) + continue + + if resp.code != 200: + logger.warning("Fetching %s failed; got HTTP status %d" % (url, resp.code)) + continue + + try: + release = json.load(resp.fp) + except ValueError: + logger.warning("Response was not valid JSON") + return None + + candidates = [] + for item in release["assets"]: + score = score_name(item["name"]) + if score is not None: + candidates.append((score, item["browser_download_url"])) + + return [item[1] for item in sorted(candidates)] + + return None + + +def download_manifest( + manifest_path, # type: Text + tags_func, # type: Callable[[], List[Text]] + url_func, # type: Callable[[List[Text]], Optional[List[Text]]] + force=False # type: bool +): + # type: (...) -> bool + if not force and not should_download(manifest_path): + return False + + tags = tags_func() + + urls = url_func(tags) + if not urls: + logger.warning("No generated manifest found") + return False + + for url in urls: + logger.info("Downloading manifest from %s" % url) + try: + resp = urlopen(url) + except Exception: + logger.warning("Downloading pregenerated manifest failed") + continue + + if resp.code != 200: + logger.warning("Downloading pregenerated manifest failed; got HTTP status %d" % + resp.code) + continue + + if url.endswith(".zst"): + if not zstandard: + continue + try: + dctx = zstandard.ZstdDecompressor() + decompressed = dctx.decompress(resp.read()) + except OSError: + logger.warning("Failed to decompress downloaded file") + continue + elif url.endswith(".bz2"): + try: + decompressed = bz2.decompress(resp.read()) + except OSError: + logger.warning("Failed to decompress downloaded file") + continue + elif url.endswith(".gz"): + fileobj = io.BytesIO(resp.read()) + try: + with gzip.GzipFile(fileobj=fileobj) as gzf: + data = gzf.read() + decompressed = data + except OSError: + logger.warning("Failed to decompress downloaded file") + continue + else: + logger.warning("Unknown file extension: %s" % url) + continue + break + else: + return False + + try: + with open(manifest_path, "wb") as f: + f.write(decompressed) + except Exception: + logger.warning("Failed to write manifest") + return False + logger.info("Manifest downloaded") + return True + + +def create_parser(): + # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser() + parser.add_argument( + "-p", "--path", type=abs_path, help="Path to manifest file.") + parser.add_argument( + "--tests-root", type=abs_path, default=wpt_root, help="Path to root of tests.") + parser.add_argument( + "--force", action="store_true", + help="Always download, even if the existing manifest is recent") + return parser + + +def download_from_github(path, tests_root, force=False): + # type: (Text, Text, bool) -> bool + return download_manifest(path, lambda: merge_pr_tags(tests_root), github_url, + force=force) + + +def run(**kwargs): + # type: (**Any) -> int + if kwargs["path"] is None: + path = os.path.join(kwargs["tests_root"], "MANIFEST.json") + else: + path = kwargs["path"] + success = download_from_github(path, kwargs["tests_root"], kwargs["force"]) + return 0 if success else 1 diff --git a/testing/web-platform/tests/tools/manifest/item.py b/testing/web-platform/tests/tools/manifest/item.py new file mode 100644 index 0000000000..02a72eeb29 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/item.py @@ -0,0 +1,385 @@ +import os.path +from inspect import isabstract +from urllib.parse import urljoin, urlparse, parse_qs +from abc import ABCMeta, abstractproperty + +from .utils import to_os_path + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any, Dict, Hashable, List, Optional, Sequence, Text, Tuple, Type, Union, cast + from .manifest import Manifest + Fuzzy = Dict[Optional[Tuple[str, str, str]], List[int]] + PageRanges = Dict[str, List[int]] + +item_types = {} # type: Dict[str, Type[ManifestItem]] + + +class ManifestItemMeta(ABCMeta): + """Custom metaclass that registers all the subclasses in the + item_types dictionary according to the value of their item_type + attribute, and otherwise behaves like an ABCMeta.""" + + def __new__(cls, name, bases, attrs): + # type: (Type[ManifestItemMeta], str, Tuple[type], Dict[str, Any]) -> ManifestItemMeta + inst = super().__new__(cls, name, bases, attrs) + if isabstract(inst): + return inst + + assert issubclass(inst, ManifestItem) + if MYPY: + inst_ = cast(Type[ManifestItem], inst) + item_type = cast(str, inst_.item_type) + else: + inst_ = inst + assert isinstance(inst_.item_type, str) + item_type = inst_.item_type + + item_types[item_type] = inst_ + + return inst_ + + +class ManifestItem(metaclass=ManifestItemMeta): + __slots__ = ("_tests_root", "path") + + def __init__(self, tests_root, path): + # type: (Text, Text) -> None + self._tests_root = tests_root + self.path = path + + @abstractproperty + def id(self): + # type: () -> Text + """The test's id (usually its url)""" + pass + + @abstractproperty + def item_type(self): + # type: () -> str + """The item's type""" + pass + + @property + def path_parts(self): + # type: () -> Tuple[Text, ...] + return tuple(self.path.split(os.path.sep)) + + def key(self): + # type: () -> Hashable + """A unique identifier for the test""" + return (self.item_type, self.id) + + def __eq__(self, other): + # type: (Any) -> bool + if not hasattr(other, "key"): + return False + return bool(self.key() == other.key()) + + def __hash__(self): + # type: () -> int + return hash(self.key()) + + def __repr__(self): + # type: () -> str + return f"<{self.__module__}.{self.__class__.__name__} id={self.id!r}, path={self.path!r}>" + + def to_json(self): + # type: () -> Tuple[Any, ...] + return () + + @classmethod + def from_json(cls, + manifest, # type: Manifest + path, # type: Text + obj # type: Any + ): + # type: (...) -> ManifestItem + path = to_os_path(path) + tests_root = manifest.tests_root + assert tests_root is not None + return cls(tests_root, path) + + +class URLManifestItem(ManifestItem): + __slots__ = ("url_base", "_url", "_extras", "_flags") + + def __init__(self, + tests_root, # type: Text + path, # type: Text + url_base, # type: Text + url, # type: Optional[Text] + **extras # type: Any + ): + # type: (...) -> None + super().__init__(tests_root, path) + assert url_base[0] == "/" + self.url_base = url_base + assert url is None or url[0] != "/" + self._url = url + self._extras = extras + parsed_url = urlparse(self.url) + self._flags = (set(parsed_url.path.rsplit("/", 1)[1].split(".")[1:-1]) | + set(parse_qs(parsed_url.query).get("wpt_flags", []))) + + @property + def id(self): + # type: () -> Text + return self.url + + @property + def url(self): + # type: () -> Text + rel_url = self._url or self.path.replace(os.path.sep, "/") + # we can outperform urljoin, because we know we just have path relative URLs + if self.url_base == "/": + return "/" + rel_url + return urljoin(self.url_base, rel_url) + + @property + def https(self): + # type: () -> bool + return "https" in self._flags or "serviceworker" in self._flags or "serviceworker-module" in self._flags + + @property + def h2(self): + # type: () -> bool + return "h2" in self._flags + + @property + def subdomain(self): + # type: () -> bool + # Note: this is currently hard-coded to check for `www`, rather than + # all possible valid subdomains. It can be extended if needed. + return "www" in self._flags + + def to_json(self): + # type: () -> Tuple[Optional[Text], Dict[Any, Any]] + rel_url = None if self._url == self.path.replace(os.path.sep, "/") else self._url + rv = (rel_url, {}) # type: Tuple[Optional[Text], Dict[Any, Any]] + return rv + + @classmethod + def from_json(cls, + manifest, # type: Manifest + path, # type: Text + obj # type: Tuple[Text, Dict[Any, Any]] + ): + # type: (...) -> URLManifestItem + path = to_os_path(path) + url, extras = obj + tests_root = manifest.tests_root + assert tests_root is not None + return cls(tests_root, + path, + manifest.url_base, + url, + **extras) + + +class TestharnessTest(URLManifestItem): + __slots__ = () + + item_type = "testharness" + + @property + def timeout(self): + # type: () -> Optional[Text] + return self._extras.get("timeout") + + @property + def pac(self): + # type: () -> Optional[Text] + return self._extras.get("pac") + + @property + def testdriver(self): + # type: () -> Optional[Text] + return self._extras.get("testdriver") + + @property + def jsshell(self): + # type: () -> Optional[Text] + return self._extras.get("jsshell") + + @property + def script_metadata(self): + # type: () -> Optional[List[Tuple[Text, Text]]] + return self._extras.get("script_metadata") + + def to_json(self): + # type: () -> Tuple[Optional[Text], Dict[Text, Any]] + rv = super().to_json() + if self.timeout is not None: + rv[-1]["timeout"] = self.timeout + if self.pac is not None: + rv[-1]["pac"] = self.pac + if self.testdriver: + rv[-1]["testdriver"] = self.testdriver + if self.jsshell: + rv[-1]["jsshell"] = True + if self.script_metadata: + rv[-1]["script_metadata"] = [(k, v) for (k,v) in self.script_metadata] + return rv + + +class RefTest(URLManifestItem): + __slots__ = ("references",) + + item_type = "reftest" + + def __init__(self, + tests_root, # type: Text + path, # type: Text + url_base, # type: Text + url, # type: Optional[Text] + references=None, # type: Optional[List[Tuple[Text, Text]]] + **extras # type: Any + ): + super().__init__(tests_root, path, url_base, url, **extras) + if references is None: + self.references = [] # type: List[Tuple[Text, Text]] + else: + self.references = references + + @property + def timeout(self): + # type: () -> Optional[Text] + return self._extras.get("timeout") + + @property + def viewport_size(self): + # type: () -> Optional[Text] + return self._extras.get("viewport_size") + + @property + def dpi(self): + # type: () -> Optional[Text] + return self._extras.get("dpi") + + @property + def fuzzy(self): + # type: () -> Fuzzy + fuzzy = self._extras.get("fuzzy", {}) # type: Union[Fuzzy, List[Tuple[Optional[Sequence[Text]], List[int]]]] + if not isinstance(fuzzy, list): + return fuzzy + + rv = {} # type: Fuzzy + for k, v in fuzzy: # type: Tuple[Optional[Sequence[Text]], List[int]] + if k is None: + key = None # type: Optional[Tuple[Text, Text, Text]] + else: + # mypy types this as Tuple[Text, ...] + assert len(k) == 3 + key = tuple(k) # type: ignore + rv[key] = v + return rv + + def to_json(self): # type: ignore + # type: () -> Tuple[Optional[Text], List[Tuple[Text, Text]], Dict[Text, Any]] + rel_url = None if self._url == self.path else self._url + rv = (rel_url, self.references, {}) # type: Tuple[Optional[Text], List[Tuple[Text, Text]], Dict[Text, Any]] + extras = rv[-1] + if self.timeout is not None: + extras["timeout"] = self.timeout + if self.viewport_size is not None: + extras["viewport_size"] = self.viewport_size + if self.dpi is not None: + extras["dpi"] = self.dpi + if self.fuzzy: + extras["fuzzy"] = list(self.fuzzy.items()) + return rv + + @classmethod + def from_json(cls, # type: ignore + manifest, # type: Manifest + path, # type: Text + obj # type: Tuple[Text, List[Tuple[Text, Text]], Dict[Any, Any]] + ): + # type: (...) -> RefTest + tests_root = manifest.tests_root + assert tests_root is not None + path = to_os_path(path) + url, references, extras = obj + return cls(tests_root, + path, + manifest.url_base, + url, + references, + **extras) + + +class PrintRefTest(RefTest): + __slots__ = ("references",) + + item_type = "print-reftest" + + @property + def page_ranges(self): + # type: () -> PageRanges + return self._extras.get("page_ranges", {}) + + def to_json(self): # type: ignore + rv = super().to_json() + if self.page_ranges: + rv[-1]["page_ranges"] = self.page_ranges + return rv + + +class ManualTest(URLManifestItem): + __slots__ = () + + item_type = "manual" + + +class ConformanceCheckerTest(URLManifestItem): + __slots__ = () + + item_type = "conformancechecker" + + +class VisualTest(URLManifestItem): + __slots__ = () + + item_type = "visual" + + +class CrashTest(URLManifestItem): + __slots__ = () + + item_type = "crashtest" + + @property + def timeout(self): + # type: () -> Optional[Text] + return None + + +class WebDriverSpecTest(URLManifestItem): + __slots__ = () + + item_type = "wdspec" + + @property + def timeout(self): + # type: () -> Optional[Text] + return self._extras.get("timeout") + + def to_json(self): + # type: () -> Tuple[Optional[Text], Dict[Text, Any]] + rv = super().to_json() + if self.timeout is not None: + rv[-1]["timeout"] = self.timeout + return rv + + +class SupportFile(ManifestItem): + __slots__ = () + + item_type = "support" + + @property + def id(self): + # type: () -> Text + return self.path diff --git a/testing/web-platform/tests/tools/manifest/jsonlib.py b/testing/web-platform/tests/tools/manifest/jsonlib.py new file mode 100644 index 0000000000..49eaf02e80 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/jsonlib.py @@ -0,0 +1,139 @@ +import re +import json + + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any, AnyStr, Callable, Dict, IO, Text + + +__all__ = ["load", "dump_local", "dump_local", "dump_dist", "dumps_dist"] + + +try: + import ujson +except ImportError: + has_ujson = False +else: + has_ujson = True + +# +# load +# + +if has_ujson: + load = ujson.load # type: Callable[[IO[AnyStr]], Any] + +else: + load = json.load + + +# +# loads +# + +if has_ujson: + loads = ujson.loads # type: Callable[[AnyStr], Any] + +else: + loads = json.loads + + +# +# dump/dumps_local options for some libraries +# +_ujson_dump_local_kwargs = { + 'ensure_ascii': False, + 'escape_forward_slashes': False, + 'indent': 1, + 'reject_bytes': True, +} # type: Dict[str, Any] + + +_json_dump_local_kwargs = { + 'ensure_ascii': False, + 'indent': 1, + 'separators': (',', ': '), +} # type: Dict[str, Any] + + +# +# dump_local (for local, non-distributed usage of JSON) +# + +if has_ujson: + def dump_local(obj, fp): + # type: (Any, IO[str]) -> None + return ujson.dump(obj, fp, **_ujson_dump_local_kwargs) + +else: + def dump_local(obj, fp): + # type: (Any, IO[str]) -> None + return json.dump(obj, fp, **_json_dump_local_kwargs) + + +# +# dumps_local (for local, non-distributed usage of JSON) +# + +if has_ujson: + def dumps_local(obj): + # type: (Any) -> Text + return ujson.dumps(obj, **_ujson_dump_local_kwargs) + +else: + def dumps_local(obj): + # type: (Any) -> Text + return json.dumps(obj, **_json_dump_local_kwargs) + + +# +# dump/dumps_dist (for distributed usage of JSON where files should safely roundtrip) +# + +_ujson_dump_dist_kwargs = { + 'sort_keys': True, + 'indent': 1, + 'reject_bytes': True, +} # type: Dict[str, Any] + + +_json_dump_dist_kwargs = { + 'sort_keys': True, + 'indent': 1, + 'separators': (',', ': '), +} # type: Dict[str, Any] + + +if has_ujson: + if ujson.dumps([], indent=1) == "[]": + # optimistically see if https://github.com/ultrajson/ultrajson/issues/429 is fixed + def _ujson_fixup(s): + # type: (str) -> str + return s + else: + _ujson_fixup_re = re.compile(r"([\[{])[\n\x20]+([}\]])") + + def _ujson_fixup(s): + # type: (str) -> str + return _ujson_fixup_re.sub( + lambda m: m.group(1) + m.group(2), + s + ) + + def dump_dist(obj, fp): + # type: (Any, IO[str]) -> None + fp.write(_ujson_fixup(ujson.dumps(obj, **_ujson_dump_dist_kwargs))) + + def dumps_dist(obj): + # type: (Any) -> Text + return _ujson_fixup(ujson.dumps(obj, **_ujson_dump_dist_kwargs)) +else: + def dump_dist(obj, fp): + # type: (Any, IO[str]) -> None + json.dump(obj, fp, **_json_dump_dist_kwargs) + + def dumps_dist(obj): + # type: (Any) -> Text + return json.dumps(obj, **_json_dump_dist_kwargs) diff --git a/testing/web-platform/tests/tools/manifest/log.py b/testing/web-platform/tests/tools/manifest/log.py new file mode 100644 index 0000000000..6551c2b5f7 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/log.py @@ -0,0 +1,11 @@ +import logging + +logger = logging.getLogger("manifest") + +def enable_debug_logging(): + # type: () -> None + logger.setLevel(logging.DEBUG) + +def get_logger(): + # type: () -> logging.Logger + return logger diff --git a/testing/web-platform/tests/tools/manifest/manifest.py b/testing/web-platform/tests/tools/manifest/manifest.py new file mode 100644 index 0000000000..4b7792ec00 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/manifest.py @@ -0,0 +1,449 @@ +import os +import sys +from atomicwrites import atomic_write +from copy import deepcopy +from multiprocessing import Pool, cpu_count + +from . import jsonlib +from . import vcs +from .item import (ConformanceCheckerTest, + CrashTest, + ManifestItem, + ManualTest, + PrintRefTest, + RefTest, + SupportFile, + TestharnessTest, + VisualTest, + WebDriverSpecTest) +from .log import get_logger +from .sourcefile import SourceFile +from .typedata import TypeData + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from logging import Logger + from typing import Any + from typing import Container + from typing import Dict + from typing import IO + from typing import Iterator + from typing import Iterable + from typing import Optional + from typing import Set + from typing import Text + from typing import Tuple + from typing import Type + from typing import Union + + +CURRENT_VERSION = 8 # type: int + + +class ManifestError(Exception): + pass + + +class ManifestVersionMismatch(ManifestError): + pass + + +class InvalidCacheError(Exception): + pass + + +item_classes = {"testharness": TestharnessTest, + "reftest": RefTest, + "print-reftest": PrintRefTest, + "crashtest": CrashTest, + "manual": ManualTest, + "wdspec": WebDriverSpecTest, + "conformancechecker": ConformanceCheckerTest, + "visual": VisualTest, + "support": SupportFile} # type: Dict[Text, Type[ManifestItem]] + + +def compute_manifest_items(source_file): + # type: (SourceFile) -> Tuple[Tuple[Text, ...], Text, Set[ManifestItem], Text] + rel_path_parts = source_file.rel_path_parts + new_type, manifest_items = source_file.manifest_items() + file_hash = source_file.hash + return rel_path_parts, new_type, set(manifest_items), file_hash + + +if MYPY: + ManifestDataType = Dict[Any, TypeData] +else: + ManifestDataType = dict + + +class ManifestData(ManifestDataType): + def __init__(self, manifest): + # type: (Manifest) -> None + """Dictionary subclass containing a TypeData instance for each test type, + keyed by type name""" + self.initialized = False # type: bool + for key, value in item_classes.items(): + self[key] = TypeData(manifest, value) + self.initialized = True + self.json_obj = None # type: None + + def __setitem__(self, key, value): + # type: (Text, TypeData) -> None + if self.initialized: + raise AttributeError + dict.__setitem__(self, key, value) + + def paths(self): + # type: () -> Set[Text] + """Get a list of all paths containing test items + without actually constructing all the items""" + rv = set() # type: Set[Text] + for item_data in self.values(): + for item in item_data: + rv.add(os.path.sep.join(item)) + return rv + + def type_by_path(self): + # type: () -> Dict[Tuple[Text, ...], Text] + rv = {} + for item_type, item_data in self.items(): + for item in item_data: + rv[item] = item_type + return rv + + +class Manifest: + def __init__(self, tests_root, url_base="/"): + # type: (Text, Text) -> None + assert url_base is not None + self._data = ManifestData(self) # type: ManifestData + self.tests_root = tests_root # type: Text + self.url_base = url_base # type: Text + + def __iter__(self): + # type: () -> Iterator[Tuple[Text, Text, Set[ManifestItem]]] + return self.itertypes() + + def itertypes(self, *types): + # type: (*Text) -> Iterator[Tuple[Text, Text, Set[ManifestItem]]] + for item_type in (types or sorted(self._data.keys())): + for path in self._data[item_type]: + rel_path = os.sep.join(path) + tests = self._data[item_type][path] + yield item_type, rel_path, tests + + def iterpath(self, path): + # type: (Text) -> Iterable[ManifestItem] + tpath = tuple(path.split(os.path.sep)) + + for type_tests in self._data.values(): + i = type_tests.get(tpath, set()) + assert i is not None + yield from i + + def iterdir(self, dir_name): + # type: (Text) -> Iterable[ManifestItem] + tpath = tuple(dir_name.split(os.path.sep)) + tpath_len = len(tpath) + + for type_tests in self._data.values(): + for path, tests in type_tests.items(): + if path[:tpath_len] == tpath: + yield from tests + + def update(self, tree, parallel=True): + # type: (Iterable[Tuple[Text, Optional[Text], bool]], bool) -> bool + """Update the manifest given an iterable of items that make up the updated manifest. + + The iterable must either generate tuples of the form (SourceFile, True) for paths + that are to be updated, or (path, False) for items that are not to be updated. This + unusual API is designed as an optimistaion meaning that SourceFile items need not be + constructed in the case we are not updating a path, but the absence of an item from + the iterator may be used to remove defunct entries from the manifest.""" + + logger = get_logger() + + changed = False + + # Create local variable references to these dicts so we avoid the + # attribute access in the hot loop below + data = self._data + + types = data.type_by_path() + remaining_manifest_paths = set(types) + + to_update = [] + + for path, file_hash, updated in tree: + path_parts = tuple(path.split(os.path.sep)) + is_new = path_parts not in remaining_manifest_paths + + if not updated and is_new: + # This is kind of a bandaid; if we ended up here the cache + # was invalid but we've been using it anyway. That's obviously + # bad; we should fix the underlying issue that we sometimes + # use an invalid cache. But at least this fixes the immediate + # problem + raise InvalidCacheError + + if not updated: + remaining_manifest_paths.remove(path_parts) + else: + assert self.tests_root is not None + source_file = SourceFile(self.tests_root, + path, + self.url_base, + file_hash) + + hash_changed = False # type: bool + + if not is_new: + if file_hash is None: + file_hash = source_file.hash + remaining_manifest_paths.remove(path_parts) + old_type = types[path_parts] + old_hash = data[old_type].hashes[path_parts] + if old_hash != file_hash: + hash_changed = True + del data[old_type][path_parts] + + if is_new or hash_changed: + to_update.append(source_file) + + if to_update: + logger.debug("Computing manifest update for %s items" % len(to_update)) + changed = True + + + # 25 items was derived experimentally (2020-01) to be approximately the + # point at which it is quicker to create a Pool and parallelize update. + pool = None + if parallel and len(to_update) > 25 and cpu_count() > 1: + # On Python 3 on Windows, using >= MAXIMUM_WAIT_OBJECTS processes + # causes a crash in the multiprocessing module. Whilst this enum + # can technically have any value, it is usually 64. For safety, + # restrict manifest regeneration to 48 processes on Windows. + # + # See https://bugs.python.org/issue26903 and https://bugs.python.org/issue40263 + processes = cpu_count() + if sys.platform == "win32" and processes > 48: + processes = 48 + pool = Pool(processes) + + # chunksize set > 1 when more than 10000 tests, because + # chunking is a net-gain once we get to very large numbers + # of items (again, experimentally, 2020-01) + chunksize = max(1, len(to_update) // 10000) + logger.debug("Doing a multiprocessed update. CPU count: %s, " + "processes: %s, chunksize: %s" % (cpu_count(), processes, chunksize)) + results = pool.imap_unordered(compute_manifest_items, + to_update, + chunksize=chunksize + ) # type: Iterator[Tuple[Tuple[Text, ...], Text, Set[ManifestItem], Text]] + else: + results = map(compute_manifest_items, to_update) + + for result in results: + rel_path_parts, new_type, manifest_items, file_hash = result + data[new_type][rel_path_parts] = manifest_items + data[new_type].hashes[rel_path_parts] = file_hash + + # Make sure to terminate the Pool, to avoid hangs on Python 3. + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool + if pool is not None: + pool.terminate() + + if remaining_manifest_paths: + changed = True + for rel_path_parts in remaining_manifest_paths: + for test_data in data.values(): + if rel_path_parts in test_data: + del test_data[rel_path_parts] + + return changed + + def to_json(self, caller_owns_obj=True): + # type: (bool) -> Dict[Text, Any] + """Dump a manifest into a object which can be serialized as JSON + + If caller_owns_obj is False, then the return value remains + owned by the manifest; it is _vitally important_ that _no_ + (even read) operation is done on the manifest, as otherwise + objects within the object graph rooted at the return value can + be mutated. This essentially makes this mode very dangerous + and only to be used under extreme care. + + """ + out_items = { + test_type: type_paths.to_json() + for test_type, type_paths in self._data.items() if type_paths + } + + if caller_owns_obj: + out_items = deepcopy(out_items) + + rv = {"url_base": self.url_base, + "items": out_items, + "version": CURRENT_VERSION} # type: Dict[Text, Any] + return rv + + @classmethod + def from_json(cls, tests_root, obj, types=None, callee_owns_obj=False): + # type: (Text, Dict[Text, Any], Optional[Container[Text]], bool) -> Manifest + """Load a manifest from a JSON object + + This loads a manifest for a given local test_root path from an + object obj, potentially partially loading it to only load the + types given by types. + + If callee_owns_obj is True, then ownership of obj transfers + to this function when called, and the caller must never mutate + the obj or anything referred to in the object graph rooted at + obj. + + """ + version = obj.get("version") + if version != CURRENT_VERSION: + raise ManifestVersionMismatch + + self = cls(tests_root, url_base=obj.get("url_base", "/")) + if not hasattr(obj, "items"): + raise ManifestError + + for test_type, type_paths in obj["items"].items(): + if test_type not in item_classes: + raise ManifestError + + if types and test_type not in types: + continue + + if not callee_owns_obj: + type_paths = deepcopy(type_paths) + + self._data[test_type].set_json(type_paths) + + return self + + +def load(tests_root, manifest, types=None): + # type: (Text, Union[IO[bytes], Text], Optional[Container[Text]]) -> Optional[Manifest] + logger = get_logger() + + logger.warning("Prefer load_and_update instead") + return _load(logger, tests_root, manifest, types) + + +__load_cache = {} # type: Dict[Text, Manifest] + + +def _load(logger, # type: Logger + tests_root, # type: Text + manifest, # type: Union[IO[bytes], Text] + types=None, # type: Optional[Container[Text]] + allow_cached=True # type: bool + ): + # type: (...) -> Optional[Manifest] + manifest_path = (manifest if isinstance(manifest, str) + else manifest.name) + if allow_cached and manifest_path in __load_cache: + return __load_cache[manifest_path] + + if isinstance(manifest, str): + if os.path.exists(manifest): + logger.debug("Opening manifest at %s" % manifest) + else: + logger.debug("Creating new manifest at %s" % manifest) + try: + with open(manifest, encoding="utf-8") as f: + rv = Manifest.from_json(tests_root, + jsonlib.load(f), + types=types, + callee_owns_obj=True) + except OSError: + return None + except ValueError: + logger.warning("%r may be corrupted", manifest) + return None + else: + rv = Manifest.from_json(tests_root, + jsonlib.load(manifest), + types=types, + callee_owns_obj=True) + + if allow_cached: + __load_cache[manifest_path] = rv + return rv + + +def load_and_update(tests_root, # type: Text + manifest_path, # type: Text + url_base, # type: Text + update=True, # type: bool + rebuild=False, # type: bool + metadata_path=None, # type: Optional[Text] + cache_root=None, # type: Optional[Text] + working_copy=True, # type: bool + types=None, # type: Optional[Container[Text]] + write_manifest=True, # type: bool + allow_cached=True, # type: bool + parallel=True # type: bool + ): + # type: (...) -> Manifest + + logger = get_logger() + + manifest = None + if not rebuild: + try: + manifest = _load(logger, + tests_root, + manifest_path, + types=types, + allow_cached=allow_cached) + except ManifestVersionMismatch: + logger.info("Manifest version changed, rebuilding") + except ManifestError: + logger.warning("Failed to load manifest, rebuilding") + + if manifest is not None and manifest.url_base != url_base: + logger.info("Manifest url base did not match, rebuilding") + manifest = None + + if manifest is None: + manifest = Manifest(tests_root, url_base) + rebuild = True + update = True + + if rebuild or update: + logger.info("Updating manifest") + for retry in range(2): + try: + tree = vcs.get_tree(tests_root, manifest, manifest_path, cache_root, + working_copy, rebuild) + changed = manifest.update(tree, parallel) + break + except InvalidCacheError: + logger.warning("Manifest cache was invalid, doing a complete rebuild") + rebuild = True + else: + # If we didn't break there was an error + raise + if write_manifest and changed: + write(manifest, manifest_path) + tree.dump_caches() + + return manifest + + +def write(manifest, manifest_path): + # type: (Manifest, Text) -> None + dir_name = os.path.dirname(manifest_path) + if not os.path.exists(dir_name): + os.makedirs(dir_name) + with atomic_write(manifest_path, overwrite=True) as f: + # Use ',' instead of the default ', ' separator to prevent trailing + # spaces: https://docs.python.org/2/library/json.html#json.dump + jsonlib.dump_dist(manifest.to_json(caller_owns_obj=True), f) + f.write("\n") diff --git a/testing/web-platform/tests/tools/manifest/requirements.txt b/testing/web-platform/tests/tools/manifest/requirements.txt new file mode 100644 index 0000000000..9f5bc8a143 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/requirements.txt @@ -0,0 +1 @@ +zstandard==0.17.0 diff --git a/testing/web-platform/tests/tools/manifest/sourcefile.py b/testing/web-platform/tests/tools/manifest/sourcefile.py new file mode 100644 index 0000000000..3919b5ac10 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/sourcefile.py @@ -0,0 +1,1144 @@ +import hashlib +import re +import os +from collections import deque +from io import BytesIO +from urllib.parse import urljoin +from fnmatch import fnmatch + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import BinaryIO + from typing import Callable + from typing import Deque + from typing import Dict + from typing import Iterable + from typing import List + from typing import Optional + from typing import Pattern + from typing import Set + from typing import Text + from typing import Tuple + from typing import Union + from typing import cast + +try: + from xml.etree import cElementTree as ElementTree +except ImportError: + from xml.etree import ElementTree as ElementTree # type: ignore + +import html5lib + +from . import XMLParser +from .item import (ConformanceCheckerTest, + CrashTest, + ManifestItem, + ManualTest, + PrintRefTest, + RefTest, + SupportFile, + TestharnessTest, + VisualTest, + WebDriverSpecTest) +from .utils import cached_property + +wd_pattern = "*.py" +js_meta_re = re.compile(br"//\s*META:\s*(\w*)=(.*)$") +python_meta_re = re.compile(br"#\s*META:\s*(\w*)=(.*)$") + +reference_file_re = re.compile(r'(^|[\-_])(not)?ref[0-9]*([\-_]|$)') + +space_chars = "".join(html5lib.constants.spaceCharacters) # type: Text + + +def replace_end(s, old, new): + # type: (Text, Text, Text) -> Text + """ + Given a string `s` that ends with `old`, replace that occurrence of `old` + with `new`. + """ + assert s.endswith(old) + return s[:-len(old)] + new + + +def read_script_metadata(f, regexp): + # type: (BinaryIO, Pattern[bytes]) -> Iterable[Tuple[Text, Text]] + """ + Yields any metadata (pairs of strings) from the file-like object `f`, + as specified according to a supplied regexp. + + `regexp` - Regexp containing two groups containing the metadata name and + value. + """ + for line in f: + assert isinstance(line, bytes), line + m = regexp.match(line) + if not m: + break + + yield (m.groups()[0].decode("utf8"), m.groups()[1].decode("utf8")) + + +_any_variants = { + "window": {"suffix": ".any.html"}, + "serviceworker": {"force_https": True}, + "serviceworker-module": {"force_https": True}, + "sharedworker": {}, + "sharedworker-module": {}, + "dedicatedworker": {"suffix": ".any.worker.html"}, + "dedicatedworker-module": {"suffix": ".any.worker-module.html"}, + "worker": {"longhand": {"dedicatedworker", "sharedworker", "serviceworker"}}, + "worker-module": {}, + "shadowrealm": {}, + "jsshell": {"suffix": ".any.js"}, +} # type: Dict[Text, Dict[Text, Any]] + + +def get_any_variants(item): + # type: (Text) -> Set[Text] + """ + Returns a set of variants (strings) defined by the given keyword. + """ + assert isinstance(item, str), item + + variant = _any_variants.get(item, None) + if variant is None: + return set() + + return variant.get("longhand", {item}) + + +def get_default_any_variants(): + # type: () -> Set[Text] + """ + Returns a set of variants (strings) that will be used by default. + """ + return set({"window", "dedicatedworker"}) + + +def parse_variants(value): + # type: (Text) -> Set[Text] + """ + Returns a set of variants (strings) defined by a comma-separated value. + """ + assert isinstance(value, str), value + + if value == "": + return get_default_any_variants() + + globals = set() + for item in value.split(","): + item = item.strip() + globals |= get_any_variants(item) + return globals + + +def global_suffixes(value): + # type: (Text) -> Set[Tuple[Text, bool]] + """ + Yields tuples of the relevant filename suffix (a string) and whether the + variant is intended to run in a JS shell, for the variants defined by the + given comma-separated value. + """ + assert isinstance(value, str), value + + rv = set() + + global_types = parse_variants(value) + for global_type in global_types: + variant = _any_variants[global_type] + suffix = variant.get("suffix", ".any.%s.html" % global_type) + rv.add((suffix, global_type == "jsshell")) + + return rv + + +def global_variant_url(url, suffix): + # type: (Text, Text) -> Text + """ + Returns a url created from the given url and suffix (all strings). + """ + url = url.replace(".any.", ".") + # If the url must be loaded over https, ensure that it will have + # the form .https.any.js + if ".https." in url and suffix.startswith(".https."): + url = url.replace(".https.", ".") + elif ".h2." in url and suffix.startswith(".h2."): + url = url.replace(".h2.", ".") + return replace_end(url, ".js", suffix) + + +def _parse_html(f): + # type: (BinaryIO) -> ElementTree.Element + doc = html5lib.parse(f, treebuilder="etree", useChardet=False) + if MYPY: + return cast(ElementTree.Element, doc) + else: + # (needs to be in else for mypy to believe this is reachable) + return doc + +def _parse_xml(f): + # type: (BinaryIO) -> ElementTree.Element + try: + # raises ValueError with an unsupported encoding, + # ParseError when there's an undefined entity + return ElementTree.parse(f).getroot() + except (ValueError, ElementTree.ParseError): + f.seek(0) + return ElementTree.parse(f, XMLParser.XMLParser()).getroot() # type: ignore + + +class SourceFile: + parsers = {"html":_parse_html, + "xhtml":_parse_xml, + "svg":_parse_xml} # type: Dict[Text, Callable[[BinaryIO], ElementTree.Element]] + + root_dir_non_test = {"common"} + + dir_non_test = {"resources", + "support", + "tools"} + + dir_path_non_test = {("css21", "archive"), + ("css", "CSS2", "archive"), + ("css", "common")} # type: Set[Tuple[Text, ...]] + + def __init__(self, tests_root, rel_path, url_base, hash=None, contents=None): + # type: (Text, Text, Text, Optional[Text], Optional[bytes]) -> None + """Object representing a file in a source tree. + + :param tests_root: Path to the root of the source tree + :param rel_path_str: File path relative to tests_root + :param url_base: Base URL used when converting file paths to urls + :param contents: Byte array of the contents of the file or ``None``. + """ + + assert not os.path.isabs(rel_path), rel_path + if os.name == "nt": + # do slash normalization on Windows + rel_path = rel_path.replace("/", "\\") + + dir_path, filename = os.path.split(rel_path) + name, ext = os.path.splitext(filename) + + type_flag = None + if "-" in name: + type_flag = name.rsplit("-", 1)[1].split(".")[0] + + meta_flags = name.split(".")[1:] + + self.tests_root = tests_root # type: Text + self.rel_path = rel_path # type: Text + self.dir_path = dir_path # type: Text + self.filename = filename # type: Text + self.name = name # type: Text + self.ext = ext # type: Text + self.type_flag = type_flag # type: Optional[Text] + self.meta_flags = meta_flags # type: Union[List[bytes], List[Text]] + self.url_base = url_base + self.contents = contents + self.items_cache = None # type: Optional[Tuple[Text, List[ManifestItem]]] + self._hash = hash + + def __getstate__(self): + # type: () -> Dict[str, Any] + # Remove computed properties if we pickle this class + rv = self.__dict__.copy() + + if "__cached_properties__" in rv: + cached_properties = rv["__cached_properties__"] + rv = {key:value for key, value in rv.items() if key not in cached_properties} + del rv["__cached_properties__"] + return rv + + def name_prefix(self, prefix): + # type: (Text) -> bool + """Check if the filename starts with a given prefix + + :param prefix: The prefix to check""" + return self.name.startswith(prefix) + + def is_dir(self): + # type: () -> bool + """Return whether this file represents a directory.""" + if self.contents is not None: + return False + + return os.path.isdir(self.rel_path) + + def open(self): + # type: () -> BinaryIO + """ + Return either + * the contents specified in the constructor, if any; + * a File object opened for reading the file contents. + """ + if self.contents is not None: + file_obj = BytesIO(self.contents) # type: BinaryIO + else: + file_obj = open(self.path, 'rb') + return file_obj + + @cached_property + def rel_path_parts(self): + # type: () -> Tuple[Text, ...] + return tuple(self.rel_path.split(os.path.sep)) + + @cached_property + def path(self): + # type: () -> Text + return os.path.join(self.tests_root, self.rel_path) + + @cached_property + def rel_url(self): + # type: () -> Text + assert not os.path.isabs(self.rel_path), self.rel_path + return self.rel_path.replace(os.sep, "/") + + @cached_property + def url(self): + # type: () -> Text + return urljoin(self.url_base, self.rel_url) + + @cached_property + def hash(self): + # type: () -> Text + if not self._hash: + with self.open() as f: + content = f.read() + + data = b"".join((b"blob ", b"%d" % len(content), b"\0", content)) + self._hash = str(hashlib.sha1(data).hexdigest()) + + return self._hash + + def in_non_test_dir(self): + # type: () -> bool + if self.dir_path == "": + return True + + parts = self.rel_path_parts + + if (parts[0] in self.root_dir_non_test or + any(item in self.dir_non_test for item in parts) or + any(parts[:len(path)] == path for path in self.dir_path_non_test)): + return True + return False + + def in_conformance_checker_dir(self): + # type: () -> bool + return self.rel_path_parts[0] == "conformance-checkers" + + @property + def name_is_non_test(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a non-test file""" + return (self.is_dir() or + self.name_prefix("MANIFEST") or + self.filename == "META.yml" or + self.filename.startswith(".") or + self.filename.endswith(".headers") or + self.filename.endswith(".ini") or + self.in_non_test_dir()) + + @property + def name_is_conformance(self): + # type: () -> bool + return (self.in_conformance_checker_dir() and + self.type_flag in ("is-valid", "no-valid")) + + @property + def name_is_conformance_support(self): + # type: () -> bool + return self.in_conformance_checker_dir() + + @property + def name_is_manual(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a manual test file""" + return self.type_flag == "manual" + + @property + def name_is_visual(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a visual test file""" + return self.type_flag == "visual" + + @property + def name_is_multi_global(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a multi-global js test file""" + return "any" in self.meta_flags and self.ext == ".js" + + @property + def name_is_worker(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a worker js test file""" + return "worker" in self.meta_flags and self.ext == ".js" + + @property + def name_is_window(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a window js test file""" + return "window" in self.meta_flags and self.ext == ".js" + + @property + def name_is_webdriver(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a webdriver spec test file""" + # wdspec tests are in subdirectories of /webdriver excluding __init__.py + # files. + rel_path_parts = self.rel_path_parts + return (((rel_path_parts[0] == "webdriver" and len(rel_path_parts) > 1) or + (rel_path_parts[:2] == ("infrastructure", "webdriver") and + len(rel_path_parts) > 2)) and + self.filename not in ("__init__.py", "conftest.py") and + fnmatch(self.filename, wd_pattern)) + + @property + def name_is_reference(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to + be a reference file (not a reftest)""" + return "/reference/" in self.url or bool(reference_file_re.search(self.name)) + + @property + def name_is_crashtest(self): + # type: () -> bool + return (self.markup_type is not None and + (self.type_flag == "crash" or "crashtests" in self.dir_path.split(os.path.sep))) + + @property + def name_is_tentative(self): + # type: () -> bool + """Check if the file name matches the conditions for the file to be a + tentative file. + + See https://web-platform-tests.org/writing-tests/file-names.html#test-features""" + return "tentative" in self.meta_flags or "tentative" in self.dir_path.split(os.path.sep) + + @property + def name_is_print_reftest(self): + # type: () -> bool + return (self.markup_type is not None and + (self.type_flag == "print" or "print" in self.dir_path.split(os.path.sep))) + + @property + def markup_type(self): + # type: () -> Optional[Text] + """Return the type of markup contained in a file, based on its extension, + or None if it doesn't contain markup""" + ext = self.ext + + if not ext: + return None + if ext[0] == ".": + ext = ext[1:] + if ext in ["html", "htm"]: + return "html" + if ext in ["xhtml", "xht", "xml"]: + return "xhtml" + if ext == "svg": + return "svg" + return None + + @cached_property + def root(self): + # type: () -> Optional[ElementTree.Element] + """Return an ElementTree Element for the root node of the file if it contains + markup, or None if it does not""" + if not self.markup_type: + return None + + parser = self.parsers[self.markup_type] + + with self.open() as f: + try: + tree = parser(f) + except Exception: + return None + + return tree + + @cached_property + def timeout_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify timeouts""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='timeout']") + + @cached_property + def pac_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify PAC (proxy auto-config)""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='pac']") + + @cached_property + def script_metadata(self): + # type: () -> Optional[List[Tuple[Text, Text]]] + if self.name_is_worker or self.name_is_multi_global or self.name_is_window: + regexp = js_meta_re + elif self.name_is_webdriver: + regexp = python_meta_re + else: + return None + + with self.open() as f: + return list(read_script_metadata(f, regexp)) + + @cached_property + def timeout(self): + # type: () -> Optional[Text] + """The timeout of a test or reference file. "long" if the file has an extended timeout + or None otherwise""" + if self.script_metadata: + if any(m == ("timeout", "long") for m in self.script_metadata): + return "long" + + if self.root is None: + return None + + if self.timeout_nodes: + timeout_str = self.timeout_nodes[0].attrib.get("content", None) # type: Optional[Text] + if timeout_str and timeout_str.lower() == "long": + return "long" + + return None + + @cached_property + def pac(self): + # type: () -> Optional[Text] + """The PAC (proxy config) of a test or reference file. A URL or null""" + if self.script_metadata: + for (meta, content) in self.script_metadata: + if meta == 'pac': + return content + + if self.root is None: + return None + + if self.pac_nodes: + return self.pac_nodes[0].attrib.get("content", None) + + return None + + @cached_property + def viewport_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify viewport sizes""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='viewport-size']") + + @cached_property + def viewport_size(self): + # type: () -> Optional[Text] + """The viewport size of a test or reference file""" + if self.root is None: + return None + + if not self.viewport_nodes: + return None + + return self.viewport_nodes[0].attrib.get("content", None) + + @cached_property + def dpi_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify device pixel ratios""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='device-pixel-ratio']") + + @cached_property + def dpi(self): + # type: () -> Optional[Text] + """The device pixel ratio of a test or reference file""" + if self.root is None: + return None + + if not self.dpi_nodes: + return None + + return self.dpi_nodes[0].attrib.get("content", None) + + def parse_ref_keyed_meta(self, node): + # type: (ElementTree.Element) -> Tuple[Optional[Tuple[Text, Text, Text]], Text] + item = node.attrib.get("content", "") # type: Text + + parts = item.rsplit(":", 1) + if len(parts) == 1: + key = None # type: Optional[Tuple[Text, Text, Text]] + value = parts[0] + else: + key_part = urljoin(self.url, parts[0]) + reftype = None + for ref in self.references: # type: Tuple[Text, Text] + if ref[0] == key_part: + reftype = ref[1] + break + if reftype not in ("==", "!="): + raise ValueError("Key %s doesn't correspond to a reference" % key_part) + key = (self.url, key_part, reftype) + value = parts[1] + + return key, value + + + @cached_property + def fuzzy_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify reftest fuzziness""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='fuzzy']") + + + @cached_property + def fuzzy(self): + # type: () -> Dict[Optional[Tuple[Text, Text, Text]], List[List[int]]] + rv = {} # type: Dict[Optional[Tuple[Text, Text, Text]], List[List[int]]] + if self.root is None: + return rv + + if not self.fuzzy_nodes: + return rv + + args = ["maxDifference", "totalPixels"] + + for node in self.fuzzy_nodes: + key, value = self.parse_ref_keyed_meta(node) + ranges = value.split(";") + if len(ranges) != 2: + raise ValueError("Malformed fuzzy value %s" % value) + arg_values = {} # type: Dict[Text, List[int]] + positional_args = deque() # type: Deque[List[int]] + for range_str_value in ranges: # type: Text + name = None # type: Optional[Text] + if "=" in range_str_value: + name, range_str_value = (part.strip() + for part in range_str_value.split("=", 1)) + if name not in args: + raise ValueError("%s is not a valid fuzzy property" % name) + if arg_values.get(name): + raise ValueError("Got multiple values for argument %s" % name) + if "-" in range_str_value: + range_min, range_max = range_str_value.split("-") + else: + range_min = range_str_value + range_max = range_str_value + try: + range_value = [int(x.strip()) for x in (range_min, range_max)] + except ValueError: + raise ValueError("Fuzzy value %s must be a range of integers" % + range_str_value) + if name is None: + positional_args.append(range_value) + else: + arg_values[name] = range_value + rv[key] = [] + for arg_name in args: + if arg_values.get(arg_name): + arg_value = arg_values.pop(arg_name) + else: + arg_value = positional_args.popleft() + rv[key].append(arg_value) + assert len(arg_values) == 0 and len(positional_args) == 0 + return rv + + @cached_property + def page_ranges_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes in a test that + specify print-reftest """ + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='reftest-pages']") + + @cached_property + def page_ranges(self): + # type: () -> Dict[Text, List[List[Optional[int]]]] + """List of ElementTree Elements corresponding to nodes in a test that + specify print-reftest page ranges""" + rv = {} # type: Dict[Text, List[List[Optional[int]]]] + for node in self.page_ranges_nodes: + key_data, value = self.parse_ref_keyed_meta(node) + # Just key by url + if key_data is None: + key = self.url + else: + key = key_data[1] + if key in rv: + raise ValueError("Duplicate page-ranges value") + rv[key] = [] + for range_str in value.split(","): + range_str = range_str.strip() + if "-" in range_str: + range_parts_str = [item.strip() for item in range_str.split("-")] + try: + range_parts = [int(item) if item else None for item in range_parts_str] + except ValueError: + raise ValueError("Malformed page-range value %s" % range_str) + if any(item == 0 for item in range_parts): + raise ValueError("Malformed page-range value %s" % range_str) + else: + try: + range_parts = [int(range_str)] + except ValueError: + raise ValueError("Malformed page-range value %s" % range_str) + rv[key].append(range_parts) + return rv + + @cached_property + def testharness_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + testharness.js script""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testharness.js']") + + @cached_property + def content_is_testharness(self): + # type: () -> Optional[bool] + """Boolean indicating whether the file content represents a + testharness.js test""" + if self.root is None: + return None + return bool(self.testharness_nodes) + + @cached_property + def variant_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + test variant""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='variant']") + + @cached_property + def test_variants(self): + # type: () -> List[Text] + rv = [] # type: List[Text] + if self.ext == ".js": + script_metadata = self.script_metadata + assert script_metadata is not None + for (key, value) in script_metadata: + if key == "variant": + rv.append(value) + else: + for element in self.variant_nodes: + if "content" in element.attrib: + variant = element.attrib["content"] # type: Text + rv.append(variant) + + for variant in rv: + if variant != "": + if variant[0] not in ("#", "?"): + raise ValueError("Non-empty variant must start with either a ? or a #") + if len(variant) == 1 or (variant[0] == "?" and variant[1] == "#"): + raise ValueError("Variants must not have empty fragment or query " + + "(omit the empty part instead)") + + if not rv: + rv = [""] + + return rv + + @cached_property + def testdriver_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + testdriver.js script""" + assert self.root is not None + return self.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testdriver.js']") + + @cached_property + def has_testdriver(self): + # type: () -> Optional[bool] + """Boolean indicating whether the file content represents a + testharness.js test""" + if self.root is None: + return None + return bool(self.testdriver_nodes) + + @cached_property + def reftest_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + to a reftest """ + if self.root is None: + return [] + + match_links = self.root.findall(".//{http://www.w3.org/1999/xhtml}link[@rel='match']") + mismatch_links = self.root.findall(".//{http://www.w3.org/1999/xhtml}link[@rel='mismatch']") + return match_links + mismatch_links + + @cached_property + def references(self): + # type: () -> List[Tuple[Text, Text]] + """List of (ref_url, relation) tuples for any reftest references specified in + the file""" + rv = [] # type: List[Tuple[Text, Text]] + rel_map = {"match": "==", "mismatch": "!="} + for item in self.reftest_nodes: + if "href" in item.attrib: + ref_url = urljoin(self.url, item.attrib["href"].strip(space_chars)) + ref_type = rel_map[item.attrib["rel"]] + rv.append((ref_url, ref_type)) + return rv + + @cached_property + def content_is_ref_node(self): + # type: () -> bool + """Boolean indicating whether the file is a non-leaf node in a reftest + graph (i.e. if it contains any """ + return bool(self.references) + + @cached_property + def css_flag_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + flag """ + if self.root is None: + return [] + return self.root.findall(".//{http://www.w3.org/1999/xhtml}meta[@name='flags']") + + @cached_property + def css_flags(self): + # type: () -> Set[Text] + """Set of flags specified in the file""" + rv = set() # type: Set[Text] + for item in self.css_flag_nodes: + if "content" in item.attrib: + for flag in item.attrib["content"].split(): + rv.add(flag) + return rv + + @cached_property + def content_is_css_manual(self): + # type: () -> Optional[bool] + """Boolean indicating whether the file content represents a + CSS WG-style manual test""" + if self.root is None: + return None + # return True if the intersection between the two sets is non-empty + return bool(self.css_flags & {"animated", "font", "history", "interact", "paged", "speech", "userstyle"}) + + @cached_property + def spec_link_nodes(self): + # type: () -> List[ElementTree.Element] + """List of ElementTree Elements corresponding to nodes representing a + , used to point to specs""" + if self.root is None: + return [] + return self.root.findall(".//{http://www.w3.org/1999/xhtml}link[@rel='help']") + + @cached_property + def spec_links(self): + # type: () -> Set[Text] + """Set of spec links specified in the file""" + rv = set() # type: Set[Text] + for item in self.spec_link_nodes: + if "href" in item.attrib: + rv.add(item.attrib["href"].strip(space_chars)) + return rv + + @cached_property + def content_is_css_visual(self): + # type: () -> Optional[bool] + """Boolean indicating whether the file content represents a + CSS WG-style visual test""" + if self.root is None: + return None + return bool(self.ext in {'.xht', '.html', '.xhtml', '.htm', '.xml', '.svg'} and + self.spec_links) + + @property + def type(self): + # type: () -> Text + possible_types = self.possible_types + if len(possible_types) == 1: + return possible_types.pop() + + rv, _ = self.manifest_items() + return rv + + @property + def possible_types(self): + # type: () -> Set[Text] + """Determines the set of possible types without reading the file""" + + if self.items_cache: + return {self.items_cache[0]} + + if self.name_is_non_test: + return {SupportFile.item_type} + + if self.name_is_manual: + return {ManualTest.item_type} + + if self.name_is_conformance: + return {ConformanceCheckerTest.item_type} + + if self.name_is_conformance_support: + return {SupportFile.item_type} + + if self.name_is_webdriver: + return {WebDriverSpecTest.item_type} + + if self.name_is_visual: + return {VisualTest.item_type} + + if self.name_is_crashtest: + return {CrashTest.item_type} + + if self.name_is_print_reftest: + return {PrintRefTest.item_type} + + if self.name_is_multi_global: + return {TestharnessTest.item_type} + + if self.name_is_worker: + return {TestharnessTest.item_type} + + if self.name_is_window: + return {TestharnessTest.item_type} + + if self.markup_type is None: + return {SupportFile.item_type} + + if not self.name_is_reference: + return {ManualTest.item_type, + TestharnessTest.item_type, + RefTest.item_type, + VisualTest.item_type, + SupportFile.item_type} + + return {TestharnessTest.item_type, + RefTest.item_type, + SupportFile.item_type} + + def manifest_items(self): + # type: () -> Tuple[Text, List[ManifestItem]] + """List of manifest items corresponding to the file. There is typically one + per test, but in the case of reftests a node may have corresponding manifest + items without being a test itself.""" + + if self.items_cache: + return self.items_cache + + drop_cached = "root" not in self.__dict__ + + if self.name_is_non_test: + rv = "support", [ + SupportFile( + self.tests_root, + self.rel_path + )] # type: Tuple[Text, List[ManifestItem]] + + elif self.name_is_manual: + rv = ManualTest.item_type, [ + ManualTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + elif self.name_is_conformance: + rv = ConformanceCheckerTest.item_type, [ + ConformanceCheckerTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + elif self.name_is_conformance_support: + rv = "support", [ + SupportFile( + self.tests_root, + self.rel_path + )] + + elif self.name_is_webdriver: + rv = WebDriverSpecTest.item_type, [ + WebDriverSpecTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url, + timeout=self.timeout + )] + + elif self.name_is_visual: + rv = VisualTest.item_type, [ + VisualTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + elif self.name_is_crashtest: + rv = CrashTest.item_type, [ + CrashTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + elif self.name_is_print_reftest: + references = self.references + if not references: + raise ValueError("%s detected as print reftest but doesn't have any refs" % + self.path) + rv = PrintRefTest.item_type, [ + PrintRefTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url, + references=references, + timeout=self.timeout, + viewport_size=self.viewport_size, + fuzzy=self.fuzzy, + page_ranges=self.page_ranges, + )] + + elif self.name_is_multi_global: + globals = "" + script_metadata = self.script_metadata + assert script_metadata is not None + for (key, value) in script_metadata: + if key == "global": + globals = value + break + + tests = [ + TestharnessTest( + self.tests_root, + self.rel_path, + self.url_base, + global_variant_url(self.rel_url, suffix) + variant, + timeout=self.timeout, + pac=self.pac, + jsshell=jsshell, + script_metadata=self.script_metadata + ) + for (suffix, jsshell) in sorted(global_suffixes(globals)) + for variant in self.test_variants + ] # type: List[ManifestItem] + rv = TestharnessTest.item_type, tests + + elif self.name_is_worker: + test_url = replace_end(self.rel_url, ".worker.js", ".worker.html") + tests = [ + TestharnessTest( + self.tests_root, + self.rel_path, + self.url_base, + test_url + variant, + timeout=self.timeout, + pac=self.pac, + script_metadata=self.script_metadata + ) + for variant in self.test_variants + ] + rv = TestharnessTest.item_type, tests + + elif self.name_is_window: + test_url = replace_end(self.rel_url, ".window.js", ".window.html") + tests = [ + TestharnessTest( + self.tests_root, + self.rel_path, + self.url_base, + test_url + variant, + timeout=self.timeout, + pac=self.pac, + script_metadata=self.script_metadata + ) + for variant in self.test_variants + ] + rv = TestharnessTest.item_type, tests + + elif self.content_is_css_manual and not self.name_is_reference: + rv = ManualTest.item_type, [ + ManualTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + elif self.content_is_testharness: + rv = TestharnessTest.item_type, [] + testdriver = self.has_testdriver + for variant in self.test_variants: + url = self.rel_url + variant + rv[1].append(TestharnessTest( + self.tests_root, + self.rel_path, + self.url_base, + url, + timeout=self.timeout, + pac=self.pac, + testdriver=testdriver, + script_metadata=self.script_metadata + )) + + elif self.content_is_ref_node: + rv = RefTest.item_type, [ + RefTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url, + references=self.references, + timeout=self.timeout, + viewport_size=self.viewport_size, + dpi=self.dpi, + fuzzy=self.fuzzy + )] + + elif self.content_is_css_visual and not self.name_is_reference: + rv = VisualTest.item_type, [ + VisualTest( + self.tests_root, + self.rel_path, + self.url_base, + self.rel_url + )] + + else: + rv = "support", [ + SupportFile( + self.tests_root, + self.rel_path + )] + + assert rv[0] in self.possible_types + assert len(rv[1]) == len(set(rv[1])) + + self.items_cache = rv + + if drop_cached and "__cached_properties__" in self.__dict__: + cached_properties = self.__dict__["__cached_properties__"] + for prop in cached_properties: + if prop in self.__dict__: + del self.__dict__[prop] + del self.__dict__["__cached_properties__"] + + return rv diff --git a/testing/web-platform/tests/tools/manifest/testpaths.py b/testing/web-platform/tests/tools/manifest/testpaths.py new file mode 100644 index 0000000000..6902f0c063 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/testpaths.py @@ -0,0 +1,112 @@ +import argparse +import json +import os +from collections import defaultdict + +from .manifest import load_and_update, Manifest +from .log import get_logger + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import Dict + from typing import Iterable + from typing import List + from typing import Text + +wpt_root = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) + +logger = get_logger() + + +def abs_path(path): + # type: (str) -> str + return os.path.abspath(os.path.expanduser(path)) + + +def create_parser(): + # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser() + parser.add_argument( + "-p", "--path", type=abs_path, help="Path to manifest file.") + parser.add_argument( + "--src-root", type=abs_path, default=None, help="Path to root of sourcetree.") + parser.add_argument( + "--tests-root", type=abs_path, default=wpt_root, help="Path to root of tests.") + parser.add_argument( + "--no-update", dest="update", action="store_false", default=True, + help="Don't update manifest before continuing") + parser.add_argument( + "-r", "--rebuild", action="store_true", default=False, + help="Force a full rebuild of the manifest.") + parser.add_argument( + "--url-base", action="store", default="/", + help="Base url to use as the mount point for tests in this manifest.") + parser.add_argument( + "--cache-root", action="store", default=os.path.join(wpt_root, ".wptcache"), + help="Path in which to store any caches (default /.wptcache/)") + parser.add_argument( + "--json", action="store_true", default=False, + help="Output as JSON") + parser.add_argument( + "test_ids", action="store", nargs="+", + help="Test ids for which to get paths") + return parser + + +def get_path_id_map(src_root, tests_root, manifest_file, test_ids): + # type: (Text, Text, Manifest, Iterable[Text]) -> Dict[Text, List[Text]] + test_ids = set(test_ids) + path_id_map = defaultdict(list) # type: Dict[Text, List[Text]] + + compute_rel_path = src_root != tests_root + + for item_type, path, tests in manifest_file: + for test in tests: + if test.id in test_ids: + if compute_rel_path: + rel_path = os.path.relpath(os.path.join(tests_root, path), + src_root) + else: + rel_path = path + path_id_map[rel_path].append(test.id) + return path_id_map + + +def get_paths(**kwargs): + # type: (**Any) -> Dict[Text, List[Text]] + tests_root = kwargs["tests_root"] + assert tests_root is not None + path = kwargs["path"] + if path is None: + path = os.path.join(kwargs["tests_root"], "MANIFEST.json") + src_root = kwargs["src_root"] + if src_root is None: + src_root = tests_root + + manifest_file = load_and_update(tests_root, + path, + kwargs["url_base"], + update=kwargs["update"], + rebuild=kwargs["rebuild"], + cache_root=kwargs["cache_root"]) + + return get_path_id_map(src_root, tests_root, manifest_file, kwargs["test_ids"]) + + +def write_output(path_id_map, as_json): + # type: (Dict[Text, List[Text]], bool) -> None + if as_json: + print(json.dumps(path_id_map)) + else: + for path, test_ids in sorted(path_id_map.items()): + print(path) + for test_id in sorted(test_ids): + print(" " + test_id) + + +def run(**kwargs): + # type: (**Any) -> None + path_id_map = get_paths(**kwargs) + write_output(path_id_map, as_json=kwargs["json"]) diff --git a/testing/web-platform/tests/tools/manifest/tests/__init__.py b/testing/web-platform/tests/tools/manifest/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/web-platform/tests/tools/manifest/tests/test_XMLParser.py b/testing/web-platform/tests/tools/manifest/tests/test_XMLParser.py new file mode 100644 index 0000000000..d2d349d11e --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/tests/test_XMLParser.py @@ -0,0 +1,56 @@ +# mypy: allow-untyped-defs + +from xml.etree.ElementTree import ParseError + +import pytest + +from ..XMLParser import XMLParser + + +@pytest.mark.parametrize("s", [ + ' ', + ' ', + ' ', + ' ', + ' ' +]) +def test_undefined_entity(s): + with pytest.raises(ParseError): + p = XMLParser() + p.feed(s) + p.close() + + +@pytest.mark.parametrize("s", [ + ' ' +]) +def test_defined_entity(s): + p = XMLParser() + p.feed(s) + d = p.close() + assert d.tag == "foo" + assert d.text == "\u00A0" + + +def test_pi(): + p = XMLParser() + p.feed('') + d = p.close() + assert d.tag == "foo" + assert len(d) == 0 + + +def test_comment(): + p = XMLParser() + p.feed('') + d = p.close() + assert d.tag == "foo" + assert len(d) == 0 + + +def test_unsupported_encoding(): + p = XMLParser() + p.feed("\u3044".encode("shift-jis")) + d = p.close() + assert d.tag == "foo" + assert d.text == "\u3044" diff --git a/testing/web-platform/tests/tools/manifest/tests/test_item.py b/testing/web-platform/tests/tools/manifest/tests/test_item.py new file mode 100644 index 0000000000..7640e9262c --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/tests/test_item.py @@ -0,0 +1,160 @@ +# mypy: allow-untyped-defs + +import inspect +import json + +import pytest + +from ..manifest import Manifest +# Prevent pytest from treating TestharnessTest as a test class +from ..item import TestharnessTest as HarnessTest +from ..item import RefTest, item_types + + +@pytest.mark.parametrize("path", [ + "a.https.c", + "a.b.https.c", + "a.https.b.c", + "a.b.https.c.d", + "a.serviceworker.c", + "a.b.serviceworker.c", + "a.serviceworker.b.c", + "a.b.serviceworker.c.d", +]) +def test_url_https(path): + m = HarnessTest("/foo", "bar/" + path, "/", "bar/" + path) + + assert m.https is True + + +@pytest.mark.parametrize("path", [ + "https", + "a.https", + "a.b.https", + "https.a", + "https.a.b", + "a.bhttps.c", + "a.httpsb.c", + "serviceworker", + "a.serviceworker", + "a.b.serviceworker", + "serviceworker.a", + "serviceworker.a.b", + "a.bserviceworker.c", + "a.serviceworkerb.c", +]) +def test_url_not_https(path): + m = HarnessTest("/foo", "bar/" + path, "/", "bar/" + path) + + assert m.https is False + + +@pytest.mark.parametrize("path", [ + "a.www.c", + "a.b.www.c", + "a.www.b.c", + "a.b.www.c.d", + "a.https.www.c", + "a.b.https.www.c", + "a.https.www.b.c", + "a.b.https.www.c.d", +]) +def test_url_subdomain(path): + m = HarnessTest("/foo", "bar/" + path, "/", "bar/" + path) + + assert m.subdomain is True + + +@pytest.mark.parametrize("path", [ + "www", + "a.www", + "a.b.www", + "www.a", + "www.a.b", + "a.bwwww.c", + "a.wwwwb.c", +]) +def test_url_not_subdomain(path): + m = HarnessTest("/foo", "bar/" + path, "/", "bar/" + path) + + assert m.subdomain is False + + +@pytest.mark.parametrize("fuzzy", [ + {('/foo/test.html', '/foo/ref.html', '=='): [[1, 1], [200, 200]]}, + {('/foo/test.html', '/foo/ref.html', '=='): [[0, 1], [100, 200]]}, + {None: [[0, 1], [100, 200]]}, + {None: [[1, 1], [200, 200]]}, +]) +def test_reftest_fuzzy(fuzzy): + t = RefTest('/', + 'foo/test.html', + '/', + 'foo/test.html', + [('/foo/ref.html', '==')], + fuzzy=fuzzy) + assert fuzzy == t.fuzzy + + json_obj = t.to_json() + + m = Manifest("/", "/") + t2 = RefTest.from_json(m, t.path, json_obj) + assert fuzzy == t2.fuzzy + + # test the roundtrip case, given tuples become lists + roundtrip = json.loads(json.dumps(json_obj)) + t3 = RefTest.from_json(m, t.path, roundtrip) + assert fuzzy == t3.fuzzy + + +@pytest.mark.parametrize("fuzzy", [ + {('/foo/test.html', '/foo/ref-2.html', '=='): [[0, 1], [100, 200]]}, + {None: [[1, 1], [200, 200]], ('/foo/test.html', '/foo/ref-2.html', '=='): [[0, 1], [100, 200]]}, +]) +def test_reftest_fuzzy_multi(fuzzy): + t = RefTest('/', + 'foo/test.html', + '/', + 'foo/test.html', + [('/foo/ref-1.html', '=='), ('/foo/ref-2.html', '==')], + fuzzy=fuzzy) + assert fuzzy == t.fuzzy + + json_obj = t.to_json() + + m = Manifest("/", "/") + t2 = RefTest.from_json(m, t.path, json_obj) + assert fuzzy == t2.fuzzy + + # test the roundtrip case, given tuples become lists + roundtrip = json.loads(json.dumps(json_obj)) + t3 = RefTest.from_json(m, t.path, roundtrip) + assert fuzzy == t3.fuzzy + + +def test_item_types(): + for key, value in item_types.items(): + assert isinstance(key, str) + assert not inspect.isabstract(value) + + +def test_wpt_flags(): + m1 = HarnessTest("/foo", "bar", "/", "bar" + "?wpt_flags=www") + assert m1.subdomain is True + assert m1.https is False + assert m1.h2 is False + + m2 = HarnessTest("/foo", "bar", "/", "bar" + "?wpt_flags=https") + assert m2.subdomain is False + assert m2.https is True + assert m2.h2 is False + + m3 = HarnessTest("/foo", "bar", "/", "bar" + "?wpt_flags=h2") + assert m3.subdomain is False + assert m3.https is False + assert m3.h2 is True + + m4 = HarnessTest("/foo", "bar", "/", "bar" + "?wpt_flags=https&wpt_flags=www") + assert m4.subdomain is True + assert m4.https is True + assert m4.h2 is False diff --git a/testing/web-platform/tests/tools/manifest/tests/test_manifest.py b/testing/web-platform/tests/tools/manifest/tests/test_manifest.py new file mode 100644 index 0000000000..a7f3d315f0 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/tests/test_manifest.py @@ -0,0 +1,310 @@ +# mypy: ignore-errors + +import os +from unittest import mock + +import hypothesis as h +import hypothesis.strategies as hs + +from .. import manifest, sourcefile, item, utils + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import Type + + +def SourceFileWithTest(path, hash, cls, **kwargs): + # type: (str, str, Type[item.ManifestItem], **Any) -> sourcefile.SourceFile + rel_path_parts = tuple(path.split(os.path.sep)) + s = mock.Mock(rel_path=path, + rel_path_parts=rel_path_parts, + hash=hash) + if cls == item.SupportFile: + test = cls("/foobar", path) + else: + assert issubclass(cls, item.URLManifestItem) + test = cls("/foobar", path, "/", utils.from_os_path(path), **kwargs) + s.manifest_items = mock.Mock(return_value=(cls.item_type, [test])) + return s # type: ignore + + +def SourceFileWithTests(path, hash, cls, variants): + # type: (str, str, Type[item.URLManifestItem], **Any) -> sourcefile.SourceFile + rel_path_parts = tuple(path.split(os.path.sep)) + s = mock.Mock(rel_path=path, + rel_path_parts=rel_path_parts, + hash=hash) + tests = [cls("/foobar", path, "/", item[0], **item[1]) for item in variants] + s.manifest_items = mock.Mock(return_value=(cls.item_type, tests)) + return s # type: ignore + + +def tree_and_sourcefile_mocks(source_files): + paths_dict = {} + tree = [] + for source_file, file_hash, updated in source_files: + paths_dict[source_file.rel_path] = source_file + tree.append([source_file.rel_path, file_hash, updated]) + + def MockSourceFile(tests_root, path, url_base, file_hash): + return paths_dict[path] + + return tree, MockSourceFile + + +@hs.composite +def sourcefile_strategy(draw): + item_classes = [item.TestharnessTest, item.RefTest, item.PrintRefTest, + item.ManualTest, item.WebDriverSpecTest, + item.ConformanceCheckerTest, item.SupportFile] + cls = draw(hs.sampled_from(item_classes)) + + path = "a" + rel_path_parts = tuple(path.split(os.path.sep)) + hash = draw(hs.text(alphabet="0123456789abcdef", min_size=40, max_size=40)) + s = mock.Mock(rel_path=path, + rel_path_parts=rel_path_parts, + hash=hash) + + if cls in (item.RefTest, item.PrintRefTest): + ref_path = "b" + ref_eq = draw(hs.sampled_from(["==", "!="])) + test = cls("/foobar", path, "/", utils.from_os_path(path), references=[(utils.from_os_path(ref_path), ref_eq)]) + elif cls is item.SupportFile: + test = cls("/foobar", path) + else: + test = cls("/foobar", path, "/", "foobar") + + s.manifest_items = mock.Mock(return_value=(cls.item_type, [test])) + return s + + +@hs.composite +def manifest_tree(draw): + names = hs.text(alphabet=hs.characters(blacklist_characters="\0/\\:*\"?<>|"), min_size=1) + tree = hs.recursive(sourcefile_strategy(), + lambda children: hs.dictionaries(names, children, min_size=1), + max_leaves=10) + + generated_root = draw(tree) + h.assume(isinstance(generated_root, dict)) + + reftest_urls = [] + output = [] + stack = [((k,), v) for k, v in generated_root.items()] + while stack: + path, node = stack.pop() + if isinstance(node, dict): + stack.extend((path + (k,), v) for k, v in node.items()) + else: + rel_path = os.path.sep.join(path) + node.rel_path = rel_path + node.rel_path_parts = tuple(path) + for test_item in node.manifest_items.return_value[1]: + test_item.path = rel_path + if isinstance(test_item, item.RefTest): + if reftest_urls: + possible_urls = hs.sampled_from(reftest_urls) | names + else: + possible_urls = names + reference = hs.tuples(hs.sampled_from(["==", "!="]), + possible_urls) + references = hs.lists(reference, min_size=1, unique=True) + test_item.references = draw(references) + reftest_urls.append(test_item.url) + output.append(node) + + return output + + +@h.given(manifest_tree()) +# FIXME: Workaround for https://github.com/web-platform-tests/wpt/issues/22758 +@h.settings(suppress_health_check=(h.HealthCheck.too_slow,)) +@h.example([SourceFileWithTest("a", "0"*40, item.ConformanceCheckerTest)]) +def test_manifest_to_json(s): + m = manifest.Manifest("") + + tree, sourcefile_mock = tree_and_sourcefile_mocks((item, None, True) for item in s) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + assert m.update(tree) is True + + json_str = m.to_json() + loaded = manifest.Manifest.from_json("/", json_str) + + assert list(loaded) == list(m) + + assert loaded.to_json() == json_str + + +@h.given(manifest_tree()) +# FIXME: Workaround for https://github.com/web-platform-tests/wpt/issues/22758 +@h.settings(suppress_health_check=(h.HealthCheck.too_slow,)) +@h.example([SourceFileWithTest("a", "0"*40, item.TestharnessTest)]) +@h.example([SourceFileWithTest("a", "0"*40, item.RefTest, references=[("/aa", "==")])]) +def test_manifest_idempotent(s): + m = manifest.Manifest("") + + tree, sourcefile_mock = tree_and_sourcefile_mocks((item, None, True) for item in s) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + assert m.update(tree) is True + + m1 = list(m) + + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + assert m.update(tree) is False + + assert list(m) == m1 + + +def test_manifest_to_json_forwardslash(): + m = manifest.Manifest("") + + s = SourceFileWithTest("a" + os.path.sep + "b", "0"*40, item.TestharnessTest) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + assert m.update(tree) is True + + assert m.to_json() == { + 'version': 8, + 'url_base': '/', + 'items': { + 'testharness': {'a': {'b': [ + '0000000000000000000000000000000000000000', + (None, {}) + ]}}, + } + } + + +def test_reftest_computation_chain(): + m = manifest.Manifest("") + + s1 = SourceFileWithTest("test1", "0"*40, item.RefTest, references=[("/test2", "==")]) + s2 = SourceFileWithTest("test2", "0"*40, item.RefTest, references=[("/test3", "==")]) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, True), (s2, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + test1 = s1.manifest_items()[1][0] + test2 = s2.manifest_items()[1][0] + + assert list(m) == [("reftest", test1.path, {test1}), + ("reftest", test2.path, {test2})] + + +def test_iterpath(): + m = manifest.Manifest("") + + sources = [SourceFileWithTest("test1", "0"*40, item.RefTest, references=[("/test1-ref", "==")]), + SourceFileWithTests("test2", "1"*40, item.TestharnessTest, [("test2-1.html", {}), + ("test2-2.html", {})]), + SourceFileWithTest("test3", "0"*40, item.TestharnessTest)] + tree, sourcefile_mock = tree_and_sourcefile_mocks((item, None, True) for item in sources) + assert len(tree) == len(sources) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + assert {item.url for item in m.iterpath("test2")} == {"/test2-1.html", + "/test2-2.html"} + assert set(m.iterpath("missing")) == set() + + +def test_no_update(): + m = manifest.Manifest("") + + s1 = SourceFileWithTest("test1", "0"*40, item.TestharnessTest) + s2 = SourceFileWithTest("test2", "0"*40, item.TestharnessTest) + + tree, sourcefile_mock = tree_and_sourcefile_mocks((item, None, True) for item in [s1, s2]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + test1 = s1.manifest_items()[1][0] + test2 = s2.manifest_items()[1][0] + + assert list(m) == [("testharness", test1.path, {test1}), + ("testharness", test2.path, {test2})] + + s1_1 = SourceFileWithTest("test1", "1"*40, item.ManualTest) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1_1, None, True), (s2, None, False)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + test1_1 = s1_1.manifest_items()[1][0] + + assert list(m) == [("manual", test1_1.path, {test1_1}), + ("testharness", test2.path, {test2})] + + +def test_no_update_delete(): + m = manifest.Manifest("") + + s1 = SourceFileWithTest("test1", "0"*40, item.TestharnessTest) + s2 = SourceFileWithTest("test2", "0"*40, item.TestharnessTest) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, True), (s2, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + test1 = s1.manifest_items()[1][0] + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, False)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + assert list(m) == [("testharness", test1.path, {test1})] + + +def test_update_from_json(): + m = manifest.Manifest("") + + s1 = SourceFileWithTest("test1", "0"*40, item.TestharnessTest) + s2 = SourceFileWithTest("test2", "0"*40, item.TestharnessTest) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, True), (s2, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + json_str = m.to_json() + m = manifest.Manifest.from_json("/", json_str) + + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + + test1 = s1.manifest_items()[1][0] + + assert list(m) == [("testharness", test1.path, {test1})] + + +def test_update_from_json_modified(): + # Create the original manifest + m = manifest.Manifest("") + s1 = SourceFileWithTest("test1", "0"*40, item.TestharnessTest) + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s1, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + json_str = m.to_json() + + # Reload it from JSON + m = manifest.Manifest.from_json("/", json_str) + + # Update it with timeout="long" + s2 = SourceFileWithTest("test1", "1"*40, item.TestharnessTest, timeout="long", pac="proxy.pac") + tree, sourcefile_mock = tree_and_sourcefile_mocks([(s2, None, True)]) + with mock.patch("tools.manifest.manifest.SourceFile", side_effect=sourcefile_mock): + m.update(tree) + json_str = m.to_json() + assert json_str == { + 'items': {'testharness': {'test1': [ + "1"*40, + (None, {'timeout': 'long', 'pac': 'proxy.pac'}) + ]}}, + 'url_base': '/', + 'version': 8 + } diff --git a/testing/web-platform/tests/tools/manifest/tests/test_sourcefile.py b/testing/web-platform/tests/tools/manifest/tests/test_sourcefile.py new file mode 100644 index 0000000000..c0b281d244 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/tests/test_sourcefile.py @@ -0,0 +1,911 @@ +# mypy: allow-untyped-defs + +import os + +import pytest + +from io import BytesIO +from ...lint.lint import check_global_metadata +from ..sourcefile import SourceFile, read_script_metadata, js_meta_re, python_meta_re + + +def create(filename, contents=b""): + assert isinstance(contents, bytes) + return SourceFile("/", filename, "/", contents=contents) + + +def items(s): + item_type, items = s.manifest_items() + if item_type == "support": + return [] + else: + return [(item_type, item.url) for item in items] + + +@pytest.mark.parametrize("rel_path", [ + ".gitignore", + ".travis.yml", + "MANIFEST.json", + "tools/test.html", + "resources/test.html", + "common/test.html", + "support/test.html", + "css21/archive/test.html", + "conformance-checkers/test.html", + "conformance-checkers/README.md", + "conformance-checkers/html/Makefile", + "conformance-checkers/html/test.html", + "foo/tools/test.html", + "foo/resources/test.html", + "foo/support/test.html", + "foo/foo-manual.html.headers", + "crashtests/foo.html.ini", + "css/common/test.html", + "css/CSS2/archive/test.html", +]) +def test_name_is_non_test(rel_path): + s = create(rel_path) + assert s.name_is_non_test or s.name_is_conformance_support + + assert not s.content_is_testharness + + assert items(s) == [] + + +@pytest.mark.parametrize("rel_path", [ + "foo/common/test.html", + "foo/conformance-checkers/test.html", + "foo/_certs/test.html", + "foo/css21/archive/test.html", + "foo/CSS2/archive/test.html", + "css/css21/archive/test.html", + "foo/test-support.html", +]) +def test_not_name_is_non_test(rel_path): + s = create(rel_path) + assert not (s.name_is_non_test or s.name_is_conformance_support) + # We aren't actually asserting what type of test these are, just their + # name doesn't prohibit them from being tests. + + +@pytest.mark.parametrize("rel_path", [ + "foo/foo-manual.html", + "html/test-manual.html", + "html/test-manual.xhtml", + "html/test-manual.https.html", + "html/test-manual.https.xhtml" +]) +def test_name_is_manual(rel_path): + s = create(rel_path) + assert not s.name_is_non_test + assert s.name_is_manual + + assert not s.content_is_testharness + + assert items(s) == [("manual", "/" + rel_path)] + + +@pytest.mark.parametrize("rel_path", [ + "html/test-visual.html", + "html/test-visual.xhtml", +]) +def test_name_is_visual(rel_path): + s = create(rel_path) + assert not s.name_is_non_test + assert s.name_is_visual + + assert not s.content_is_testharness + + assert items(s) == [("visual", "/" + rel_path)] + + +@pytest.mark.parametrize("rel_path", [ + "css-namespaces-3/reftest/ref-lime-1.xml", + "css21/reference/pass_if_box_ahem.html", + "css21/csswg-issues/submitted/css2.1/reference/ref-green-box-100x100.xht", + "selectors-3/selectors-empty-001-ref.xml", + "css21/text/text-indent-wrap-001-notref-block-margin.xht", + "css21/text/text-indent-wrap-001-notref-block-margin.xht", + "css21/css-e-notation-ref-1.html", + "css21/floats/floats-placement-vertical-004-ref2.xht", + "css21/box/rtl-linebreak-notref1.xht", + "css21/box/rtl-linebreak-notref2.xht", + "html/canvas/element/drawing-images-to-the-canvas/drawimage_html_image_5_ref.html", + "html/canvas/element/line-styles/lineto_ref.html", + "html/rendering/non-replaced-elements/the-fieldset-element-0/ref.html" +]) +def test_name_is_reference(rel_path): + s = create(rel_path) + assert not s.name_is_non_test + assert s.name_is_reference + + assert not s.content_is_testharness + + assert items(s) == [] + + +def test_name_is_tentative(): + s = create("css/css-ui/appearance-revert-001.tentative.html") + assert s.name_is_tentative + + s = create("css/css-ui/tentative/appearance-revert-001.html") + assert s.name_is_tentative + + s = create("css/css-ui/appearance-revert-001.html") + assert not s.name_is_tentative + + +@pytest.mark.parametrize("rel_path", [ + "webdriver/tests/foo.py", + "webdriver/tests/print/foo.py", + "webdriver/tests/foo-crash.py", + "webdriver/tests/foo-visual.py", +]) +def test_name_is_webdriver(rel_path): + s = create(rel_path) + assert s.name_is_webdriver + + item_type, items = s.manifest_items() + assert item_type == "wdspec" + + +def test_worker(): + s = create("html/test.worker.js") + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert s.name_is_worker + assert not s.name_is_window + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected_urls = [ + "/html/test.worker.html", + ] + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +def test_window(): + s = create("html/test.window.js") + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert s.name_is_window + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected_urls = [ + "/html/test.window.html", + ] + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +def test_worker_long_timeout(): + contents = b"""// META: timeout=long +importScripts('/resources/testharness.js') +test()""" + + metadata = list(read_script_metadata(BytesIO(contents), js_meta_re)) + assert metadata == [("timeout", "long")] + + s = create("html/test.worker.js", contents=contents) + assert s.name_is_worker + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + for item in items: + assert item.timeout == "long" + + +def test_window_long_timeout(): + contents = b"""// META: timeout=long +test()""" + + metadata = list(read_script_metadata(BytesIO(contents), js_meta_re)) + assert metadata == [("timeout", "long")] + + s = create("html/test.window.js", contents=contents) + assert s.name_is_window + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + for item in items: + assert item.timeout == "long" + + +def test_worker_with_variants(): + contents = b"""// META: variant= +// META: variant=?wss +test()""" + + s = create("html/test.worker.js", contents=contents) + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert s.name_is_worker + assert not s.name_is_window + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected_urls = [ + "/html/test.worker.html" + suffix + for suffix in ["", "?wss"] + ] + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +def test_window_with_variants(): + contents = b"""// META: variant= +// META: variant=?wss +test()""" + + s = create("html/test.window.js", contents=contents) + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert s.name_is_window + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected_urls = [ + "/html/test.window.html" + suffix + for suffix in ["", "?wss"] + ] + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +def test_python_long_timeout(): + contents = b"""# META: timeout=long + +""" + + metadata = list(read_script_metadata(BytesIO(contents), + python_meta_re)) + assert metadata == [("timeout", "long")] + + s = create("webdriver/test.py", contents=contents) + assert s.name_is_webdriver + + item_type, items = s.manifest_items() + assert item_type == "wdspec" + + for item in items: + assert item.timeout == "long" + + +def test_multi_global(): + s = create("html/test.any.js") + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected_urls = [ + "/html/test.any.html", + "/html/test.any.worker.html", + ] + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +def test_multi_global_long_timeout(): + contents = b"""// META: timeout=long +importScripts('/resources/testharness.js') +test()""" + + metadata = list(read_script_metadata(BytesIO(contents), js_meta_re)) + assert metadata == [("timeout", "long")] + + s = create("html/test.any.js", contents=contents) + assert s.name_is_multi_global + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + for item in items: + assert item.timeout == "long" + + +@pytest.mark.parametrize("input,expected", [ + (b"window", {"window"}), + (b"sharedworker", {"sharedworker"}), + (b"sharedworker,serviceworker", {"serviceworker", "sharedworker"}), + (b"worker", {"dedicatedworker", "serviceworker", "sharedworker"}), +]) +def test_multi_global_with_custom_globals(input, expected): + contents = b"""// META: global=%s +test()""" % input + + assert list(check_global_metadata(input)) == [] + + s = create("html/test.any.js", contents=contents) + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + urls = { + "dedicatedworker": "/html/test.any.worker.html", + "serviceworker": "/html/test.any.serviceworker.html", + "sharedworker": "/html/test.any.sharedworker.html", + "window": "/html/test.any.html", + } + + expected_urls = sorted(urls[ty] for ty in expected) + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.jsshell is False + assert item.timeout is None + + +def test_multi_global_with_jsshell_globals(): + contents = b"""// META: global=window,dedicatedworker,jsshell +test()""" + + s = create("html/test.any.js", contents=contents) + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + expected = [ + ("/html/test.any.html", False), + ("/html/test.any.js", True), + ("/html/test.any.worker.html", False), + ] + assert len(items) == len(expected) + + for item, (url, jsshell) in zip(items, expected): + assert item.url == url + assert item.jsshell == jsshell + assert item.timeout is None + + +def test_multi_global_with_variants(): + contents = b"""// META: global=window,worker +// META: variant= +// META: variant=?wss +test()""" + + s = create("html/test.any.js", contents=contents) + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + item_type, items = s.manifest_items() + assert item_type == "testharness" + + urls = { + "dedicatedworker": "/html/test.any.worker.html", + "serviceworker": "/html/test.any.serviceworker.html", + "sharedworker": "/html/test.any.sharedworker.html", + "window": "/html/test.any.html", + } + + expected_urls = sorted( + urls[ty] + suffix + for ty in ["dedicatedworker", "serviceworker", "sharedworker", "window"] + for suffix in ["", "?wss"] + ) + assert len(items) == len(expected_urls) + + for item, url in zip(items, expected_urls): + assert item.url == url + assert item.timeout is None + + +@pytest.mark.parametrize("input,expected", [ + (b"""//META: foo=bar\n""", [("foo", "bar")]), + (b"""// META: foo=bar\n""", [("foo", "bar")]), + (b"""// META: foo=bar\n""", [("foo", "bar")]), + (b"""\n// META: foo=bar\n""", []), + (b""" // META: foo=bar\n""", []), + (b"""// META: foo=bar\n// META: baz=quux\n""", [("foo", "bar"), ("baz", "quux")]), + (b"""// META: foo=bar\n\n// META: baz=quux\n""", [("foo", "bar")]), + (b"""// META: foo=bar\n// Start of the test\n// META: baz=quux\n""", [("foo", "bar")]), + (b"""// META:\n""", []), + (b"""// META: foobar\n""", []), +]) +def test_script_metadata(input, expected): + metadata = read_script_metadata(BytesIO(input), js_meta_re) + assert list(metadata) == expected + + +@pytest.mark.parametrize("ext", ["htm", "html"]) +def test_testharness(ext): + content = b"" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert s.content_is_testharness + + assert items(s) == [("testharness", "/" + filename)] + + +@pytest.mark.parametrize("variant", ["", "?foo", "#bar", "?foo#bar"]) +def test_testharness_variant(variant): + content = (b"" % variant.encode("utf-8") + + b"" + + b"") + + filename = "html/test.html" + s = create(filename, content) + + s.test_variants = [variant, "?fixed"] + + +@pytest.mark.parametrize("variant", ["?", "#", "?#bar"]) +def test_testharness_variant_invalid(variant): + content = (b"" % variant.encode("utf-8") + + b"" + + b"") + + filename = "html/test.html" + s = create(filename, content) + + with pytest.raises(ValueError): + s.test_variants + + +@pytest.mark.parametrize("ext", ["htm", "html"]) +def test_relative_testharness(ext): + content = b"" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + assert items(s) == [] + + +@pytest.mark.parametrize("ext", ["xhtml", "xht", "xml"]) +def test_testharness_xhtml(ext): + content = b""" + + + + + + + +""" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert s.content_is_testharness + + assert items(s) == [("testharness", "/" + filename)] + + +@pytest.mark.parametrize("ext", ["xhtml", "xht", "xml"]) +def test_relative_testharness_xhtml(ext): + content = b""" + + + + + + + +""" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.content_is_testharness + + assert items(s) == [] + + +def test_testharness_svg(): + content = b"""\ + + +Null test + + + +""" + + filename = "html/test.svg" + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert s.root is not None + assert s.content_is_testharness + + assert items(s) == [("testharness", "/" + filename)] + + +def test_relative_testharness_svg(): + content = b"""\ + + +Null test + + + +""" + + filename = "html/test.svg" + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert s.root is not None + assert not s.content_is_testharness + + assert items(s) == [] + + +@pytest.mark.parametrize("filename", ["test", "test.test"]) +def test_testharness_ext(filename): + content = b"" + + s = create("html/" + filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + + assert not s.root + assert not s.content_is_testharness + + assert items(s) == [] + + +@pytest.mark.parametrize("ext", ["htm", "html"]) +def test_testdriver(ext): + content = b"" + + filename = "html/test." + ext + s = create(filename, content) + + assert s.has_testdriver + + +@pytest.mark.parametrize("ext", ["htm", "html"]) +def test_relative_testdriver(ext): + content = b"" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.has_testdriver + + +@pytest.mark.parametrize("ext", ["htm", "html"]) +def test_reftest(ext): + content = b"" + + filename = "foo/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + assert not s.content_is_testharness + + assert s.content_is_ref_node + + assert items(s) == [("reftest", "/" + filename)] + + +@pytest.mark.parametrize("ext", ["xht", "html", "xhtml", "htm", "xml", "svg"]) +def test_css_visual(ext): + content = b""" + + + + + + +""" + + filename = "html/test." + ext + s = create(filename, content) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + assert not s.content_is_testharness + assert not s.content_is_ref_node + + assert s.content_is_css_visual + + assert items(s) == [("visual", "/" + filename)] + + +@pytest.mark.parametrize("ext", ["xht", "xhtml", "xml"]) +def test_xhtml_with_entity(ext): + content = b""" + + +  + +""" + + filename = "html/test." + ext + s = create(filename, content) + + assert s.root is not None + + assert items(s) == [] + + +def test_no_parse(): + s = create("foo/bar.xml", "\uFFFF".encode("utf-8")) + + assert not s.name_is_non_test + assert not s.name_is_manual + assert not s.name_is_visual + assert not s.name_is_multi_global + assert not s.name_is_worker + assert not s.name_is_reference + assert not s.content_is_testharness + assert not s.content_is_ref_node + assert not s.content_is_css_visual + + assert items(s) == [] + + +@pytest.mark.parametrize("input,expected", [ + ("aA", "aA"), + ("a/b", "a/b" if os.name != "nt" else "a\\b"), + ("a\\b", "a\\b") +]) +def test_relpath_normalized(input, expected): + s = create(input, b"") + assert s.rel_path == expected + + +@pytest.mark.parametrize("url", [b"ref.html", + b"\x20ref.html", + b"ref.html\x20", + b"\x09\x0a\x0c\x0d\x20ref.html\x09\x0a\x0c\x0d\x20"]) +def test_reftest_url_whitespace(url): + content = b"" % url + s = create("foo/test.html", content) + assert s.references == [("/foo/ref.html", "==")] + + +@pytest.mark.parametrize("url", [b"http://example.com/", + b"\x20http://example.com/", + b"http://example.com/\x20", + b"\x09\x0a\x0c\x0d\x20http://example.com/\x09\x0a\x0c\x0d\x20"]) +def test_spec_links_whitespace(url): + content = b"" % url + s = create("foo/test.html", content) + assert s.spec_links == {"http://example.com/"} + + +def test_url_base(): + contents = b"""// META: global=window,worker +// META: variant= +// META: variant=?wss +test()""" + + s = SourceFile("/", "html/test.any.js", "/_fake_base/", contents=contents) + item_type, items = s.manifest_items() + + assert item_type == "testharness" + + assert [item.url for item in items] == ['/_fake_base/html/test.any.html', + '/_fake_base/html/test.any.html?wss', + '/_fake_base/html/test.any.serviceworker.html', + '/_fake_base/html/test.any.serviceworker.html?wss', + '/_fake_base/html/test.any.sharedworker.html', + '/_fake_base/html/test.any.sharedworker.html?wss', + '/_fake_base/html/test.any.worker.html', + '/_fake_base/html/test.any.worker.html?wss'] + + assert items[0].url_base == "/_fake_base/" + + +@pytest.mark.parametrize("fuzzy, expected", [ + (b"ref.html:1;200", {("/foo/test.html", "/foo/ref.html", "=="): [[1, 1], [200, 200]]}), + (b"ref.html:0-1;100-200", {("/foo/test.html", "/foo/ref.html", "=="): [[0, 1], [100, 200]]}), + (b"0-1;100-200", {None: [[0,1], [100, 200]]}), + (b"maxDifference=1;totalPixels=200", {None: [[1, 1], [200, 200]]}), + (b"totalPixels=200;maxDifference=1", {None: [[1, 1], [200, 200]]}), + (b"totalPixels=200;1", {None: [[1, 1], [200, 200]]}), + (b"maxDifference=1;200", {None: [[1, 1], [200, 200]]}),]) +def test_reftest_fuzzy(fuzzy, expected): + content = b""" + +""" % fuzzy + + s = create("foo/test.html", content) + + assert s.content_is_ref_node + assert s.fuzzy == expected + +@pytest.mark.parametrize("fuzzy, expected", [ + ([b"1;200"], {None: [[1, 1], [200, 200]]}), + ([b"ref-2.html:0-1;100-200"], {("/foo/test.html", "/foo/ref-2.html", "=="): [[0, 1], [100, 200]]}), + ([b"1;200", b"ref-2.html:0-1;100-200"], + {None: [[1, 1], [200, 200]], + ("/foo/test.html", "/foo/ref-2.html", "=="): [[0,1], [100, 200]]})]) +def test_reftest_fuzzy_multi(fuzzy, expected): + content = b""" + +""" + for item in fuzzy: + content += b'\n' % item + + s = create("foo/test.html", content) + + assert s.content_is_ref_node + assert s.fuzzy == expected + +@pytest.mark.parametrize("pac, expected", [ + (b"proxy.pac", "proxy.pac")]) +def test_pac(pac, expected): + content = b""" + +""" % pac + + s = create("foo/test.html", content) + assert s.pac == expected + +@pytest.mark.parametrize("page_ranges, expected", [ + (b"1-2", [[1, 2]]), + (b"1-1,3-4", [[1, 1], [3, 4]]), + (b"1,3", [[1], [3]]), + (b"2-", [[2, None]]), + (b"-2", [[None, 2]]), + (b"-2,2-", [[None, 2], [2, None]]), + (b"1,6-7,8", [[1], [6, 7], [8]])]) +def test_page_ranges(page_ranges, expected): + content = b""" + +""" % page_ranges + + s = create("foo/test-print.html", content) + + assert s.page_ranges == {"/foo/test-print.html": expected} + + +@pytest.mark.parametrize("page_ranges", [b"a", b"1-a", b"1=2", b"1-2:2-3"]) +def test_page_ranges_invalid(page_ranges): + content = b""" + +""" % page_ranges + + s = create("foo/test-print.html", content) + with pytest.raises(ValueError): + s.page_ranges + + +def test_hash(): + s = SourceFile("/", "foo", "/", contents=b"Hello, World!") + assert "b45ef6fec89518d314f546fd6c3025367b721684" == s.hash diff --git a/testing/web-platform/tests/tools/manifest/tests/test_utils.py b/testing/web-platform/tests/tools/manifest/tests/test_utils.py new file mode 100644 index 0000000000..e8cf1ad689 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/tests/test_utils.py @@ -0,0 +1,15 @@ +# mypy: allow-untyped-defs + +import os +import subprocess +from unittest import mock + +from .. import utils + + +def test_git_for_path_no_git(): + this_dir = os.path.dirname(__file__) + with mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "foo")): + assert utils.git(this_dir) is None diff --git a/testing/web-platform/tests/tools/manifest/typedata.py b/testing/web-platform/tests/tools/manifest/typedata.py new file mode 100644 index 0000000000..4061c9e610 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/typedata.py @@ -0,0 +1,336 @@ +from collections.abc import MutableMapping + + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import Dict + from typing import Iterator + from typing import List + from typing import Optional + from typing import Set + from typing import Text + from typing import Tuple + from typing import Type + from typing import Union + + # avoid actually importing these, they're only used by type comments + from . import item + from . import manifest + + +if MYPY: + TypeDataType = MutableMapping[Tuple[str, ...], Set[item.ManifestItem]] + PathHashType = MutableMapping[Tuple[str, ...], str] +else: + TypeDataType = MutableMapping + PathHashType = MutableMapping + + +class TypeData(TypeDataType): + def __init__(self, m, type_cls): + # type: (manifest.Manifest, Type[item.ManifestItem]) -> None + """Dict-like object containing the TestItems for each test type. + + Loading an actual Item class for each test is unnecessarily + slow, so this class allows lazy-loading of the test + items. When the manifest is loaded we store the raw json + corresponding to the test type, and only create an Item + subclass when the test is accessed. In order to remain + API-compatible with consumers that depend on getting an Item + from iteration, we do egerly load all items when iterating + over the class.""" + self._manifest = m + self._type_cls = type_cls # type: Type[item.ManifestItem] + self._json_data = {} # type: Dict[Text, Any] + self._data = {} # type: Dict[Text, Any] + self._hashes = {} # type: Dict[Tuple[Text, ...], Text] + self.hashes = PathHash(self) + + def _delete_node(self, data, key): + # type: (Dict[Text, Any], Tuple[Text, ...]) -> None + """delete a path from a Dict data with a given key""" + path = [] + node = data + for pathseg in key[:-1]: + path.append((node, pathseg)) + node = node[pathseg] + if not isinstance(node, dict): + raise KeyError(key) + + del node[key[-1]] + while path: + node, pathseg = path.pop() + if len(node[pathseg]) == 0: + del node[pathseg] + else: + break + + def __getitem__(self, key): + # type: (Tuple[Text, ...]) -> Set[item.ManifestItem] + node = self._data # type: Union[Dict[Text, Any], Set[item.ManifestItem], List[Any]] + for pathseg in key: + if isinstance(node, dict) and pathseg in node: + node = node[pathseg] + else: + break + else: + if isinstance(node, set): + return node + else: + raise KeyError(key) + + node = self._json_data + found = False + for pathseg in key: + if isinstance(node, dict) and pathseg in node: + node = node[pathseg] + else: + break + else: + found = True + + if not found: + raise KeyError(key) + + if not isinstance(node, list): + raise KeyError(key) + + self._hashes[key] = node[0] + + data = set() + path = "/".join(key) + for test in node[1:]: + manifest_item = self._type_cls.from_json(self._manifest, path, test) + data.add(manifest_item) + + node = self._data + assert isinstance(node, dict) + for pathseg in key[:-1]: + node = node.setdefault(pathseg, {}) + assert isinstance(node, dict) + assert key[-1] not in node + node[key[-1]] = data + + self._delete_node(self._json_data, key) + + return data + + def __setitem__(self, key, value): + # type: (Tuple[Text, ...], Set[item.ManifestItem]) -> None + try: + self._delete_node(self._json_data, key) + except KeyError: + pass + + node = self._data + for i, pathseg in enumerate(key[:-1]): + node = node.setdefault(pathseg, {}) + if not isinstance(node, dict): + raise KeyError(f"{key!r} is a child of a test ({key[:i+1]!r})") + node[key[-1]] = value + + def __delitem__(self, key): + # type: (Tuple[Text, ...]) -> None + try: + self._delete_node(self._data, key) + except KeyError: + self._delete_node(self._json_data, key) + else: + try: + del self._hashes[key] + except KeyError: + pass + + def __iter__(self): + # type: () -> Iterator[Tuple[Text, ...]] + """Iterator over keys in the TypeData in codepoint order""" + data_node = self._data # type: Optional[Union[Dict[Text, Any], Set[item.ManifestItem]]] + json_node = self._json_data # type: Optional[Union[Dict[Text, Any], List[Any]]] + path = tuple() # type: Tuple[Text, ...] + stack = [(data_node, json_node, path)] + while stack: + data_node, json_node, path = stack.pop() + if isinstance(data_node, set) or isinstance(json_node, list): + assert data_node is None or json_node is None + yield path + else: + assert data_node is None or isinstance(data_node, dict) + assert json_node is None or isinstance(json_node, dict) + + keys = set() # type: Set[Text] + if data_node is not None: + keys |= set(iter(data_node)) + if json_node is not None: + keys |= set(iter(json_node)) + + for key in sorted(keys, reverse=True): + stack.append((data_node.get(key) if data_node is not None else None, + json_node.get(key) if json_node is not None else None, + path + (key,))) + + def __len__(self): + # type: () -> int + count = 0 + + stack = [self._data] # type: List[Union[Dict[Text, Any], Set[item.ManifestItem]]] + while stack: + v = stack.pop() + if isinstance(v, set): + count += 1 + else: + stack.extend(v.values()) + + json_stack = [self._json_data] # type: List[Union[Dict[Text, Any], List[Any]]] + while json_stack: + json_v = json_stack.pop() + if isinstance(json_v, list): + count += 1 + else: + json_stack.extend(json_v.values()) + + return count + + def __nonzero__(self): + # type: () -> bool + return bool(self._data) or bool(self._json_data) + + __bool__ = __nonzero__ + + def __contains__(self, key): + # type: (Any) -> bool + # we provide our own impl of this to avoid calling __getitem__ and generating items for + # those in self._json_data + node = self._data + for pathseg in key: + if pathseg in node: + node = node[pathseg] + else: + break + else: + return bool(isinstance(node, set)) + + node = self._json_data + for pathseg in key: + if pathseg in node: + node = node[pathseg] + else: + break + else: + return bool(isinstance(node, list)) + + return False + + def clear(self): + # type: () -> None + # much, much simpler/quicker than that defined in MutableMapping + self._json_data.clear() + self._data.clear() + self._hashes.clear() + + def set_json(self, json_data): + # type: (Dict[Text, Any]) -> None + """Provide the object with a raw JSON blob + + Note that this object graph is assumed to be owned by the TypeData + object after the call, so the caller must not mutate any part of the + graph. + """ + if self._json_data: + raise ValueError("set_json call when JSON data is not empty") + + self._json_data = json_data + + def to_json(self): + # type: () -> Dict[Text, Any] + """Convert the current data to JSON + + Note that the returned object may contain references to the internal + data structures, and is only guaranteed to be valid until the next + __getitem__, __setitem__, __delitem__ call, so the caller must not + mutate any part of the returned object graph. + + """ + json_rv = self._json_data.copy() + + def safe_sorter(element): + # type: (Tuple[str,str]) -> Tuple[str,str] + """key function to sort lists with None values.""" + if element and not element[0]: + return ("", element[1]) + else: + return element + + stack = [(self._data, json_rv, tuple())] # type: List[Tuple[Dict[Text, Any], Dict[Text, Any], Tuple[Text, ...]]] + while stack: + data_node, json_node, par_full_key = stack.pop() + for k, v in data_node.items(): + full_key = par_full_key + (k,) + if isinstance(v, set): + assert k not in json_node + json_node[k] = [self._hashes.get( + full_key)] + [t for t in sorted((test.to_json() for test in v), key=safe_sorter)] + else: + json_node[k] = json_node.get(k, {}).copy() + stack.append((v, json_node[k], full_key)) + + return json_rv + + +class PathHash(PathHashType): + def __init__(self, data): + # type: (TypeData) -> None + self._data = data + + def __getitem__(self, k): + # type: (Tuple[Text, ...]) -> Text + if k not in self._data: + raise KeyError + + if k in self._data._hashes: + return self._data._hashes[k] + + node = self._data._json_data + for pathseg in k: + if pathseg in node: + node = node[pathseg] + else: + break + else: + return node[0] # type: ignore + + assert False, "unreachable" + raise KeyError + + def __setitem__(self, k, v): + # type: (Tuple[Text, ...], Text) -> None + if k not in self._data: + raise KeyError + + if k in self._data._hashes: + self._data._hashes[k] = v + + node = self._data._json_data + for pathseg in k: + if pathseg in node: + node = node[pathseg] + else: + break + else: + node[0] = v # type: ignore + return + + self._data._hashes[k] = v + + def __delitem__(self, k): + # type: (Tuple[Text, ...]) -> None + raise ValueError("keys here must match underlying data") + + def __iter__(self): + # type: () -> Iterator[Tuple[Text, ...]] + return iter(self._data) + + def __len__(self): + # type: () -> int + return len(self._data) diff --git a/testing/web-platform/tests/tools/manifest/update.py b/testing/web-platform/tests/tools/manifest/update.py new file mode 100755 index 0000000000..d7ef2082eb --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/update.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +import argparse +import os + +from . import manifest +from . import vcs +from .log import get_logger, enable_debug_logging +from .download import download_from_github + +here = os.path.dirname(__file__) + +wpt_root = os.path.abspath(os.path.join(here, os.pardir, os.pardir)) + +logger = get_logger() + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Any + from typing import Optional + from .manifest import Manifest # avoid cyclic import + + +def update(tests_root, # type: str + manifest, # type: Manifest + manifest_path=None, # type: Optional[str] + working_copy=True, # type: bool + cache_root=None, # type: Optional[str] + rebuild=False, # type: bool + parallel=True # type: bool + ): + # type: (...) -> bool + logger.warning("Deprecated; use manifest.load_and_update instead") + logger.info("Updating manifest") + + tree = vcs.get_tree(tests_root, manifest, manifest_path, cache_root, + working_copy, rebuild) + return manifest.update(tree, parallel) + + +def update_from_cli(**kwargs): + # type: (**Any) -> None + tests_root = kwargs["tests_root"] + path = kwargs["path"] + assert tests_root is not None + + if not kwargs["rebuild"] and kwargs["download"]: + download_from_github(path, tests_root) + + manifest.load_and_update(tests_root, + path, + kwargs["url_base"], + update=True, + rebuild=kwargs["rebuild"], + cache_root=kwargs["cache_root"], + parallel=kwargs["parallel"]) + + +def abs_path(path): + # type: (str) -> str + return os.path.abspath(os.path.expanduser(path)) + + +def create_parser(): + # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser() + parser.add_argument( + "-v", "--verbose", dest="verbose", action="store_true", default=False, + help="Turn on verbose logging") + parser.add_argument( + "-p", "--path", type=abs_path, help="Path to manifest file.") + parser.add_argument( + "--tests-root", type=abs_path, default=wpt_root, help="Path to root of tests.") + parser.add_argument( + "-r", "--rebuild", action="store_true", default=False, + help="Force a full rebuild of the manifest.") + parser.add_argument( + "--url-base", action="store", default="/", + help="Base url to use as the mount point for tests in this manifest.") + parser.add_argument( + "--no-download", dest="download", action="store_false", default=True, + help="Never attempt to download the manifest.") + parser.add_argument( + "--cache-root", action="store", default=os.path.join(wpt_root, ".wptcache"), + help="Path in which to store any caches (default /.wptcache/)") + parser.add_argument( + "--no-parallel", dest="parallel", action="store_false", default=True, + help="Do not parallelize building the manifest") + return parser + + +def run(*args, **kwargs): + # type: (*Any, **Any) -> None + if kwargs["path"] is None: + kwargs["path"] = os.path.join(kwargs["tests_root"], "MANIFEST.json") + if kwargs["verbose"]: + enable_debug_logging() + update_from_cli(**kwargs) + + +def main(): + # type: () -> None + opts = create_parser().parse_args() + + run(**vars(opts)) diff --git a/testing/web-platform/tests/tools/manifest/utils.py b/testing/web-platform/tests/tools/manifest/utils.py new file mode 100644 index 0000000000..59ddb66378 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/utils.py @@ -0,0 +1,93 @@ +import os +import subprocess +import sys + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Text + from typing import Callable + from typing import Any + from typing import Generic + from typing import TypeVar + from typing import Optional + T = TypeVar("T") +else: + # eww, eww, ewwww + Generic = {} + T = object() + Generic[T] = object + + +def rel_path_to_url(rel_path, url_base="/"): + # type: (Text, Text) -> Text + assert not os.path.isabs(rel_path), rel_path + if url_base[0] != "/": + url_base = "/" + url_base + if url_base[-1] != "/": + url_base += "/" + return url_base + rel_path.replace(os.sep, "/") + + +def from_os_path(path): + # type: (Text) -> Text + assert os.path.sep == "/" or sys.platform == "win32" + if "/" == os.path.sep: + rv = path + else: + rv = path.replace(os.path.sep, "/") + if "\\" in rv: + raise ValueError("path contains \\ when separator is %s" % os.path.sep) + return rv + + +def to_os_path(path): + # type: (Text) -> Text + assert os.path.sep == "/" or sys.platform == "win32" + if "\\" in path: + raise ValueError("normalised path contains \\") + if "/" == os.path.sep: + return path + return path.replace("/", os.path.sep) + + +def git(path): + # type: (Text) -> Optional[Callable[..., Text]] + def gitfunc(cmd, *args): + # type: (Text, *Text) -> Text + full_cmd = ["git", cmd] + list(args) + try: + return subprocess.check_output(full_cmd, cwd=path, stderr=subprocess.STDOUT).decode('utf8') + except Exception as e: + if sys.platform == "win32" and isinstance(e, WindowsError): + full_cmd[0] = "git.bat" + return subprocess.check_output(full_cmd, cwd=path, stderr=subprocess.STDOUT).decode('utf8') + else: + raise + + try: + # this needs to be a command that fails if we aren't in a git repo + gitfunc("rev-parse", "--show-toplevel") + except (subprocess.CalledProcessError, OSError): + return None + else: + return gitfunc + + +class cached_property(Generic[T]): + def __init__(self, func): + # type: (Callable[[Any], T]) -> None + self.func = func + self.__doc__ = getattr(func, "__doc__") + self.name = func.__name__ + + def __get__(self, obj, cls=None): + # type: (Any, Optional[type]) -> T + if obj is None: + return self # type: ignore + + # we can unconditionally assign as next time this won't be called + assert self.name not in obj.__dict__ + rv = obj.__dict__[self.name] = self.func(obj) + obj.__dict__.setdefault("__cached_properties__", set()).add(self.name) + return rv diff --git a/testing/web-platform/tests/tools/manifest/vcs.py b/testing/web-platform/tests/tools/manifest/vcs.py new file mode 100644 index 0000000000..ec59f42a31 --- /dev/null +++ b/testing/web-platform/tests/tools/manifest/vcs.py @@ -0,0 +1,319 @@ +import abc +import os +import stat +from collections import deque +from collections.abc import MutableMapping + +from . import jsonlib +from .utils import git + +# Cannot do `from ..gitignore import gitignore` because +# relative import beyond toplevel throws *ImportError*! +from gitignore import gitignore # type: ignore + + +MYPY = False +if MYPY: + # MYPY is set to True when run under Mypy. + from typing import Dict, Optional, List, Set, Text, Iterable, Any, Tuple, Iterator + from .manifest import Manifest # cyclic import under MYPY guard + stat_result = os.stat_result + + GitIgnoreCacheType = MutableMapping[bytes, bool] +else: + GitIgnoreCacheType = MutableMapping + + +def get_tree(tests_root, manifest, manifest_path, cache_root, + working_copy=True, rebuild=False): + # type: (Text, Manifest, Optional[Text], Optional[Text], bool, bool) -> FileSystem + tree = None + if cache_root is None: + cache_root = os.path.join(tests_root, ".wptcache") + if not os.path.exists(cache_root): + try: + os.makedirs(cache_root) + except OSError: + cache_root = None + + if not working_copy: + raise ValueError("working_copy=False unsupported") + + if tree is None: + tree = FileSystem(tests_root, + manifest.url_base, + manifest_path=manifest_path, + cache_path=cache_root, + rebuild=rebuild) + return tree + + +class GitHasher: + def __init__(self, path): + # type: (Text) -> None + self.git = git(path) + + def _local_changes(self): + # type: () -> Set[Text] + """get a set of files which have changed between HEAD and working copy""" + assert self.git is not None + # note that git runs the command with tests_root as the cwd, which may + # not be the root of the git repo (e.g., within a browser repo) + cmd = ["diff-index", "--relative", "--no-renames", "--name-only", "-z", "HEAD"] + data = self.git(*cmd) + return set(data.split("\0")) + + def hash_cache(self): + # type: () -> Dict[Text, Optional[Text]] + """ + A dict of rel_path -> current git object id if the working tree matches HEAD else None + """ + hash_cache = {} # type: Dict[Text, Optional[Text]] + + if self.git is None: + return hash_cache + + # note that git runs the command with tests_root as the cwd, which may + # not be the root of the git repo (e.g., within a browser repo) + cmd = ["ls-tree", "-r", "-z", "HEAD"] + local_changes = self._local_changes() + for result in self.git(*cmd).split("\0")[:-1]: # type: Text + data, rel_path = result.rsplit("\t", 1) + hash_cache[rel_path] = None if rel_path in local_changes else data.split(" ", 3)[2] + + return hash_cache + + + +class FileSystem: + def __init__(self, tests_root, url_base, cache_path, manifest_path=None, rebuild=False): + # type: (Text, Text, Optional[Text], Optional[Text], bool) -> None + self.tests_root = tests_root + self.url_base = url_base + self.ignore_cache = None + self.mtime_cache = None + tests_root_bytes = tests_root.encode("utf8") + if cache_path is not None: + if manifest_path is not None: + self.mtime_cache = MtimeCache(cache_path, tests_root, manifest_path, rebuild) + if gitignore.has_ignore(tests_root_bytes): + self.ignore_cache = GitIgnoreCache(cache_path, tests_root, rebuild) + self.path_filter = gitignore.PathFilter(tests_root_bytes, + extras=[b".git/"], + cache=self.ignore_cache) + git = GitHasher(tests_root) + self.hash_cache = git.hash_cache() + + def __iter__(self): + # type: () -> Iterator[Tuple[Text, Optional[Text], bool]] + mtime_cache = self.mtime_cache + for dirpath, dirnames, filenames in self.path_filter( + walk(self.tests_root.encode("utf8"))): + for filename, path_stat in filenames: + path = os.path.join(dirpath, filename).decode("utf8") + if mtime_cache is None or mtime_cache.updated(path, path_stat): + file_hash = self.hash_cache.get(path, None) + yield path, file_hash, True + else: + yield path, None, False + + def dump_caches(self): + # type: () -> None + for cache in [self.mtime_cache, self.ignore_cache]: + if cache is not None: + cache.dump() + + +class CacheFile(metaclass=abc.ABCMeta): + def __init__(self, cache_root, tests_root, rebuild=False): + # type: (Text, Text, bool) -> None + self.tests_root = tests_root + if not os.path.exists(cache_root): + os.makedirs(cache_root) + self.path = os.path.join(cache_root, self.file_name) + self.modified = False + self.data = self.load(rebuild) + + @abc.abstractproperty + def file_name(self): + # type: () -> Text + pass + + def dump(self): + # type: () -> None + if not self.modified: + return + with open(self.path, 'w') as f: + jsonlib.dump_local(self.data, f) + + def load(self, rebuild=False): + # type: (bool) -> Dict[Text, Any] + data = {} # type: Dict[Text, Any] + try: + if not rebuild: + with open(self.path) as f: + try: + data = jsonlib.load(f) + except ValueError: + pass + data = self.check_valid(data) + except OSError: + pass + return data + + def check_valid(self, data): + # type: (Dict[Text, Any]) -> Dict[Text, Any] + """Check if the cached data is valid and return an updated copy of the + cache containing only data that can be used.""" + return data + + +class MtimeCache(CacheFile): + file_name = "mtime.json" + + def __init__(self, cache_root, tests_root, manifest_path, rebuild=False): + # type: (Text, Text, Text, bool) -> None + self.manifest_path = manifest_path + super().__init__(cache_root, tests_root, rebuild) + + def updated(self, rel_path, stat): + # type: (Text, stat_result) -> bool + """Return a boolean indicating whether the file changed since the cache was last updated. + + This implicitly updates the cache with the new mtime data.""" + mtime = stat.st_mtime + if mtime != self.data.get(rel_path): + self.modified = True + self.data[rel_path] = mtime + return True + return False + + def check_valid(self, data): + # type: (Dict[Any, Any]) -> Dict[Any, Any] + if data.get("/tests_root") != self.tests_root: + self.modified = True + else: + if self.manifest_path is not None and os.path.exists(self.manifest_path): + mtime = os.path.getmtime(self.manifest_path) + if data.get("/manifest_path") != [self.manifest_path, mtime]: + self.modified = True + else: + self.modified = True + if self.modified: + data = {} + data["/tests_root"] = self.tests_root + return data + + def dump(self): + # type: () -> None + if self.manifest_path is None: + raise ValueError + if not os.path.exists(self.manifest_path): + return + mtime = os.path.getmtime(self.manifest_path) + self.data["/manifest_path"] = [self.manifest_path, mtime] + self.data["/tests_root"] = self.tests_root + super().dump() + + +class GitIgnoreCache(CacheFile, GitIgnoreCacheType): + file_name = "gitignore2.json" + + def check_valid(self, data): + # type: (Dict[Any, Any]) -> Dict[Any, Any] + ignore_path = os.path.join(self.tests_root, ".gitignore") + mtime = os.path.getmtime(ignore_path) + if data.get("/gitignore_file") != [ignore_path, mtime]: + self.modified = True + data = {} + data["/gitignore_file"] = [ignore_path, mtime] + return data + + def __contains__(self, key): + # type: (Any) -> bool + try: + key = key.decode("utf-8") + except Exception: + return False + + return key in self.data + + def __getitem__(self, key): + # type: (bytes) -> bool + real_key = key.decode("utf-8") + v = self.data[real_key] + assert isinstance(v, bool) + return v + + def __setitem__(self, key, value): + # type: (bytes, bool) -> None + real_key = key.decode("utf-8") + if self.data.get(real_key) != value: + self.modified = True + self.data[real_key] = value + + def __delitem__(self, key): + # type: (bytes) -> None + real_key = key.decode("utf-8") + del self.data[real_key] + + def __iter__(self): + # type: () -> Iterator[bytes] + return (key.encode("utf-8") for key in self.data) + + def __len__(self): + # type: () -> int + return len(self.data) + + +def walk(root): + # type: (bytes) -> Iterable[Tuple[bytes, List[Tuple[bytes, stat_result]], List[Tuple[bytes, stat_result]]]] + """Re-implementation of os.walk. Returns an iterator over + (dirpath, dirnames, filenames), with some semantic differences + to os.walk. + + This has a similar interface to os.walk, with the important difference + that instead of lists of filenames and directory names, it yields + lists of tuples of the form [(name, stat)] where stat is the result of + os.stat for the file. That allows reusing the same stat data in the + caller. It also always returns the dirpath relative to the root, with + the root iself being returned as the empty string. + + Unlike os.walk the implementation is not recursive.""" + + get_stat = os.stat + is_dir = stat.S_ISDIR + is_link = stat.S_ISLNK + join = os.path.join + listdir = os.listdir + relpath = os.path.relpath + + root = os.path.abspath(root) + stack = deque([(root, b"")]) + + while stack: + dir_path, rel_path = stack.popleft() + try: + # Note that listdir and error are globals in this module due + # to earlier import-*. + names = listdir(dir_path) + except OSError: + continue + + dirs, non_dirs = [], [] + for name in names: + path = join(dir_path, name) + try: + path_stat = get_stat(path) + except OSError: + continue + if is_dir(path_stat.st_mode): + dirs.append((name, path_stat)) + else: + non_dirs.append((name, path_stat)) + + yield rel_path, dirs, non_dirs + for name, path_stat in dirs: + new_path = join(dir_path, name) + if not is_link(path_stat.st_mode): + stack.append((new_path, relpath(new_path, root))) -- cgit v1.2.3