summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/lint/lint.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/tools/lint/lint.py')
-rw-r--r--testing/web-platform/tests/tools/lint/lint.py996
1 files changed, 996 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/lint/lint.py b/testing/web-platform/tests/tools/lint/lint.py
new file mode 100644
index 0000000000..9fc78d9b68
--- /dev/null
+++ b/testing/web-platform/tests/tools/lint/lint.py
@@ -0,0 +1,996 @@
+import abc
+import argparse
+import ast
+import json
+import logging
+import multiprocessing
+import os
+import re
+import subprocess
+import sys
+import tempfile
+from collections import defaultdict
+from typing import (Any, Callable, Dict, IO, Iterable, List, Optional, Sequence, Set, Text, Tuple,
+ Type, TypeVar)
+
+from urllib.parse import urlsplit, urljoin
+
+try:
+ from xml.etree import cElementTree as ElementTree
+except ImportError:
+ from xml.etree import ElementTree as ElementTree # type: ignore
+
+from . import fnmatch
+from . import rules
+from .. import localpaths
+from ..ci.tc.github_checks_output import get_gh_checks_outputter, GitHubChecksOutputter
+from ..gitignore.gitignore import PathFilter
+from ..wpt import testfiles
+from ..manifest.vcs import walk
+
+from ..manifest.sourcefile import SourceFile, js_meta_re, python_meta_re, space_chars, get_any_variants
+
+
+# The Ignorelist is a two level dictionary. The top level is indexed by
+# error names (e.g. 'TRAILING WHITESPACE'). Each of those then has a map of
+# file patterns (e.g. 'foo/*') to a set of specific line numbers for the
+# exception. The line numbers are optional; if missing the entire file
+# ignores the error.
+Ignorelist = Dict[str, Dict[str, Set[Optional[int]]]]
+
+# Define an arbitrary typevar
+T = TypeVar("T")
+
+
+logger: Optional[logging.Logger] = None
+
+
+def setup_logging(prefix: bool = False) -> None:
+ global logger
+ if logger is None:
+ logger = logging.getLogger(os.path.basename(os.path.splitext(__file__)[0]))
+ handler: logging.Handler = logging.StreamHandler(sys.stdout)
+ # Only add a handler if the parent logger is missing a handler
+ parent = logger.parent
+ assert isinstance(parent, logging.Logger)
+ if parent and len(parent.handlers) == 0:
+ handler = logging.StreamHandler(sys.stdout)
+ logger.addHandler(handler)
+ if prefix:
+ format = logging.BASIC_FORMAT
+ else:
+ format = "%(message)s"
+ formatter = logging.Formatter(format)
+ for handler in logger.handlers:
+ handler.setFormatter(formatter)
+ logger.setLevel(logging.DEBUG)
+
+
+setup_logging()
+
+
+ERROR_MSG = """You must fix all errors; for details on how to fix them, see
+https://web-platform-tests.org/writing-tests/lint-tool.html
+
+However, instead of fixing a particular error, it's sometimes
+OK to add a line to the lint.ignore file in the root of the
+web-platform-tests directory to make the lint tool ignore it.
+
+For example, to make the lint tool ignore all '%s'
+errors in the %s file,
+you could add the following line to the lint.ignore file.
+
+%s: %s"""
+
+
+def all_filesystem_paths(repo_root: Text, subdir: Optional[Text] = None) -> Iterable[Text]:
+ path_filter = PathFilter(repo_root.encode("utf8"),
+ extras=[b".git/"])
+ if subdir:
+ expanded_path = subdir.encode("utf8")
+ subdir_str = expanded_path
+ else:
+ expanded_path = repo_root.encode("utf8")
+ for dirpath, dirnames, filenames in path_filter(walk(expanded_path)):
+ for filename, _ in filenames:
+ path = os.path.join(dirpath, filename)
+ if subdir:
+ path = os.path.join(subdir_str, path)
+ assert not os.path.isabs(path), path
+ yield path.decode("utf8")
+
+
+def _all_files_equal(paths: Iterable[Text]) -> bool:
+ """
+ Checks all the paths are files that are byte-for-byte identical
+
+ :param paths: the list of paths to compare
+ :returns: True if they are all identical
+ """
+ paths = list(paths)
+ if len(paths) < 2:
+ return True
+
+ first = paths.pop()
+ size = os.path.getsize(first)
+ if any(os.path.getsize(path) != size for path in paths):
+ return False
+
+ # Chunk this to avoid eating up memory and file descriptors
+ bufsize = 4096*4 # 16KB, a "reasonable" number of disk sectors
+ groupsize = 8 # Hypothesised to be large enough in the common case that everything fits in one group
+ with open(first, "rb") as first_f:
+ for start in range(0, len(paths), groupsize):
+ path_group = paths[start:start+groupsize]
+ first_f.seek(0)
+ try:
+ files = [open(x, "rb") for x in path_group]
+ for _ in range(0, size, bufsize):
+ a = first_f.read(bufsize)
+ for f in files:
+ b = f.read(bufsize)
+ if a != b:
+ return False
+ finally:
+ for f in files:
+ f.close()
+
+ return True
+
+
+def check_path_length(repo_root: Text, path: Text) -> List[rules.Error]:
+ if len(path) + 1 > 150:
+ return [rules.PathLength.error(path, (path, len(path) + 1))]
+ return []
+
+
+def check_file_type(repo_root: Text, path: Text) -> List[rules.Error]:
+ if os.path.islink(path):
+ return [rules.FileType.error(path, (path, "symlink"))]
+ return []
+
+
+def check_worker_collision(repo_root: Text, path: Text) -> List[rules.Error]:
+ endings = [(".any.html", ".any.js"),
+ (".any.worker.html", ".any.js"),
+ (".worker.html", ".worker.js")]
+ for path_ending, generated in endings:
+ if path.endswith(path_ending):
+ return [rules.WorkerCollision.error(path, (path_ending, generated))]
+ return []
+
+
+def check_gitignore_file(repo_root: Text, path: Text) -> List[rules.Error]:
+ if not path.endswith(".gitignore"):
+ return []
+
+ path_parts = path.split(os.path.sep)
+ if len(path_parts) == 1:
+ return []
+
+ if path_parts[-1] != ".gitignore":
+ return []
+
+ if (path_parts[0] in ["tools", "docs"] or
+ path_parts[:2] == ["resources", "webidl2"]):
+ return []
+
+ return [rules.GitIgnoreFile.error(path)]
+
+
+def check_mojom_js(repo_root: Text, path: Text) -> List[rules.Error]:
+ if path.endswith(".mojom.js"):
+ return [rules.MojomJSFile.error(path)]
+ return []
+
+
+def check_ahem_copy(repo_root: Text, path: Text) -> List[rules.Error]:
+ lpath = path.lower()
+ if "ahem" in lpath and lpath.endswith(".ttf"):
+ return [rules.AhemCopy.error(path)]
+ return []
+
+
+def check_tentative_directories(repo_root: Text, path: Text) -> List[rules.Error]:
+ path_parts = path.split(os.path.sep)
+ for directory in path_parts[:-1]:
+ if "tentative" in directory and directory != "tentative":
+ return [rules.TentativeDirectoryName.error(path)]
+ return []
+
+
+def check_git_ignore(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
+ errors = []
+
+ with tempfile.TemporaryFile('w+', newline='') as f:
+ for path in paths:
+ f.write('%s\n' % os.path.join(repo_root, path))
+ f.seek(0)
+ try:
+ matches = subprocess.check_output(
+ ["git", "check-ignore", "--verbose", "--no-index", "--stdin"], stdin=f)
+ for match in matches.strip().split(b'\n'):
+ match_filter, path_bytes = match.split()
+ _, _, filter_string = match_filter.split(b':')
+ # If the matching filter reported by check-ignore is a special-case exception,
+ # that's fine. Otherwise, it requires a new special-case exception.
+ if filter_string[0:1] != b'!':
+ path = path_bytes.decode("utf8")
+ errors.append(rules.IgnoredPath.error(path, (path,)))
+ except subprocess.CalledProcessError:
+ # Nonzero return code means that no match exists.
+ pass
+ return errors
+
+
+drafts_csswg_re = re.compile(r"https?\:\/\/drafts\.csswg\.org\/([^/?#]+)")
+w3c_tr_re = re.compile(r"https?\:\/\/www\.w3c?\.org\/TR\/([^/?#]+)")
+w3c_dev_re = re.compile(r"https?\:\/\/dev\.w3c?\.org\/[^/?#]+\/([^/?#]+)")
+
+
+def check_unique_testharness_basenames(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
+ """
+ Checks that all testharness files have unique basename paths.
+
+ The 'basename path' refers to the entire path excluding the extension. For
+ example, 'foo/bar/baz.html' and 'foo/bar/baz.xhtml' have the same basename
+ path, but 'foo/bar/baz.html' and 'foo/qux/baz.html' do not.
+
+ Testharness files with identical basenames have caused issues in downstream
+ infrastructure (see https://github.com/web-platform-tests/wpt/issues/7570),
+ and may cause confusion in general.
+
+ :param repo_root: the repository root
+ :param paths: list of all paths
+ :returns: a list of errors found in ``paths``
+ """
+
+ errors = []
+ file_dict = defaultdict(list)
+ for path in paths:
+ source_file = SourceFile(repo_root, path, "/")
+ if "testharness" not in source_file.possible_types:
+ continue
+ file_name, file_extension = os.path.splitext(path)
+ file_dict[file_name].append(file_extension)
+ for k, v in file_dict.items():
+ if len(v) == 1:
+ continue
+ context = (', '.join(v),)
+ for extension in v:
+ errors.append(rules.DuplicateBasenamePath.error(k + extension, context))
+ return errors
+
+
+def check_unique_case_insensitive_paths(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
+ seen: Dict[Text, Text] = {}
+ errors = []
+ for path in paths:
+ lower_path = path.lower()
+ if lower_path in seen:
+ context = (seen[lower_path],)
+ errors.append(rules.DuplicatePathCaseInsensitive.error(path, context))
+ else:
+ seen[lower_path] = path
+ return errors
+
+
+def parse_ignorelist(f: IO[Text]) -> Tuple[Ignorelist, Set[Text]]:
+ """
+ Parse the ignorelist file given by `f`, and return the parsed structure.
+
+ :returns: a tuple of an Ignorelist and a set of files that are completely
+ skipped by the linter (i.e. have a '*' entry).
+ """
+
+ data: Ignorelist = defaultdict(lambda:defaultdict(set))
+ skipped_files: Set[Text] = set()
+
+ for line in f:
+ line = line.strip()
+ if not line or line.startswith("#"):
+ continue
+ parts = [item.strip() for item in line.split(":")]
+
+ if len(parts) == 2:
+ error_types_s, file_match = parts
+ line_number: Optional[int] = None
+ else:
+ error_types_s, file_match, line_number_s = parts
+ line_number = int(line_number_s)
+
+ error_types = {item.strip() for item in error_types_s.split(",")}
+ file_match = os.path.normcase(file_match)
+
+ if "*" in error_types:
+ skipped_files.add(file_match)
+ else:
+ for error_type in error_types:
+ data[error_type][file_match].add(line_number)
+
+ return data, skipped_files
+
+
+def filter_ignorelist_errors(data: Ignorelist, errors: Sequence[rules.Error]) -> List[rules.Error]:
+ """
+ Filter out those errors that are ignored in `data`.
+ """
+
+ if not errors:
+ return []
+
+ skipped = [False for item in range(len(errors))]
+
+ for i, (error_type, msg, path, line) in enumerate(errors):
+ normpath = os.path.normcase(path)
+ # Allow skipping all lint errors except the IGNORED PATH lint,
+ # which explains how to fix it correctly and shouldn't be skipped.
+ if error_type in data and error_type != "IGNORED PATH":
+ wl_files = data[error_type]
+ for file_match, allowed_lines in wl_files.items():
+ if None in allowed_lines or line in allowed_lines:
+ if fnmatch.fnmatchcase(normpath, file_match):
+ skipped[i] = True
+
+ return [item for i, item in enumerate(errors) if not skipped[i]]
+
+
+regexps = [item() for item in # type: ignore
+ [rules.TrailingWhitespaceRegexp,
+ rules.TabsRegexp,
+ rules.CRRegexp,
+ rules.SetTimeoutRegexp,
+ rules.W3CTestOrgRegexp,
+ rules.WebPlatformTestRegexp,
+ rules.Webidl2Regexp,
+ rules.ConsoleRegexp,
+ rules.GenerateTestsRegexp,
+ rules.PrintRegexp,
+ rules.LayoutTestsRegexp,
+ rules.MissingDepsRegexp,
+ rules.SpecialPowersRegexp,
+ rules.AssertThrowsRegexp,
+ rules.PromiseRejectsRegexp,
+ rules.AssertPreconditionRegexp]]
+
+
+def check_regexp_line(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
+ errors: List[rules.Error] = []
+
+ applicable_regexps = [regexp for regexp in regexps if regexp.applies(path)]
+
+ for i, line in enumerate(f):
+ for regexp in applicable_regexps:
+ if regexp.search(line):
+ errors.append((regexp.name, regexp.description, path, i+1))
+
+ return errors
+
+
+def check_parsed(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
+ source_file = SourceFile(repo_root, path, "/", contents=f.read())
+
+ errors: List[rules.Error] = []
+
+ if path.startswith("css/"):
+ if (source_file.type != "support" and
+ not source_file.name_is_reference and
+ not source_file.name_is_tentative and
+ not source_file.name_is_crashtest and
+ not source_file.spec_links):
+ return [rules.MissingLink.error(path)]
+
+ if source_file.name_is_non_test:
+ return []
+
+ if source_file.markup_type is None:
+ return []
+
+ if source_file.root is None:
+ return [rules.ParseFailed.error(path)]
+
+ if source_file.type == "manual" and not source_file.name_is_manual:
+ errors.append(rules.ContentManual.error(path))
+
+ if source_file.type == "visual" and not source_file.name_is_visual:
+ errors.append(rules.ContentVisual.error(path))
+
+ about_blank_parts = urlsplit("about:blank")
+ for reftest_node in source_file.reftest_nodes:
+ href = reftest_node.attrib.get("href", "").strip(space_chars)
+ parts = urlsplit(href)
+
+ if parts == about_blank_parts:
+ continue
+
+ if (parts.scheme or parts.netloc):
+ errors.append(rules.AbsoluteUrlRef.error(path, (href,)))
+ continue
+
+ ref_url = urljoin(source_file.url, href)
+ ref_parts = urlsplit(ref_url)
+
+ if source_file.url == ref_url:
+ errors.append(rules.SameFileRef.error(path))
+ continue
+
+ assert ref_parts.path != ""
+
+ reference_file = os.path.join(repo_root, ref_parts.path[1:])
+ reference_rel = reftest_node.attrib.get("rel", "")
+
+ if not os.path.isfile(reference_file):
+ errors.append(rules.NonexistentRef.error(path,
+ (reference_rel, href)))
+
+ if len(source_file.timeout_nodes) > 1:
+ errors.append(rules.MultipleTimeout.error(path))
+
+ for timeout_node in source_file.timeout_nodes:
+ timeout_value = timeout_node.attrib.get("content", "").lower()
+ if timeout_value != "long":
+ errors.append(rules.InvalidTimeout.error(path, (timeout_value,)))
+
+ required_elements: List[Text] = []
+
+ testharnessreport_nodes: List[ElementTree.Element] = []
+ if source_file.testharness_nodes:
+ test_type = source_file.manifest_items()[0]
+ if test_type not in ("testharness", "manual"):
+ errors.append(rules.TestharnessInOtherType.error(path, (test_type,)))
+ if len(source_file.testharness_nodes) > 1:
+ errors.append(rules.MultipleTestharness.error(path))
+
+ testharnessreport_nodes = source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testharnessreport.js']")
+ if not testharnessreport_nodes:
+ errors.append(rules.MissingTestharnessReport.error(path))
+ else:
+ if len(testharnessreport_nodes) > 1:
+ errors.append(rules.MultipleTestharnessReport.error(path))
+
+ for element in source_file.variant_nodes:
+ if "content" not in element.attrib:
+ errors.append(rules.VariantMissing.error(path))
+ else:
+ variant = element.attrib["content"]
+ if variant != "":
+ if (variant[0] not in ("?", "#") or
+ len(variant) == 1 or
+ (variant[0] == "?" and variant[1] == "#")):
+ errors.append(rules.MalformedVariant.error(path, (path,)))
+
+ required_elements.extend(key for key, value in {"testharness": True,
+ "testharnessreport": len(testharnessreport_nodes) > 0,
+ "timeout": len(source_file.timeout_nodes) > 0}.items()
+ if value)
+
+ testdriver_vendor_nodes: List[ElementTree.Element] = []
+ if source_file.testdriver_nodes:
+ if len(source_file.testdriver_nodes) > 1:
+ errors.append(rules.MultipleTestdriver.error(path))
+
+ testdriver_vendor_nodes = source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testdriver-vendor.js']")
+ if not testdriver_vendor_nodes:
+ errors.append(rules.MissingTestdriverVendor.error(path))
+ else:
+ if len(testdriver_vendor_nodes) > 1:
+ errors.append(rules.MultipleTestdriverVendor.error(path))
+
+ required_elements.append("testdriver")
+ if len(testdriver_vendor_nodes) > 0:
+ required_elements.append("testdriver-vendor")
+
+ if required_elements:
+ seen_elements = defaultdict(bool)
+
+ for elem in source_file.root.iter():
+ if source_file.timeout_nodes and elem == source_file.timeout_nodes[0]:
+ seen_elements["timeout"] = True
+ if seen_elements["testharness"]:
+ errors.append(rules.LateTimeout.error(path))
+
+ elif source_file.testharness_nodes and elem == source_file.testharness_nodes[0]:
+ seen_elements["testharness"] = True
+
+ elif testharnessreport_nodes and elem == testharnessreport_nodes[0]:
+ seen_elements["testharnessreport"] = True
+ if not seen_elements["testharness"]:
+ errors.append(rules.EarlyTestharnessReport.error(path))
+
+ elif source_file.testdriver_nodes and elem == source_file.testdriver_nodes[0]:
+ seen_elements["testdriver"] = True
+
+ elif testdriver_vendor_nodes and elem == testdriver_vendor_nodes[0]:
+ seen_elements["testdriver-vendor"] = True
+ if not seen_elements["testdriver"]:
+ errors.append(rules.EarlyTestdriverVendor.error(path))
+
+ if all(seen_elements[name] for name in required_elements):
+ break
+
+ for element in source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src]"):
+ src = element.attrib["src"]
+
+ def incorrect_path(script: Text, src: Text) -> bool:
+ return (script == src or
+ ("/%s" % script in src and src != "/resources/%s" % script))
+
+ if incorrect_path("testharness.js", src):
+ errors.append(rules.TestharnessPath.error(path))
+
+ if incorrect_path("testharnessreport.js", src):
+ errors.append(rules.TestharnessReportPath.error(path))
+
+ if incorrect_path("testdriver.js", src):
+ errors.append(rules.TestdriverPath.error(path))
+
+ if incorrect_path("testdriver-vendor.js", src):
+ errors.append(rules.TestdriverVendorPath.error(path))
+
+ script_path = None
+ try:
+ script_path = urlsplit(urljoin(source_file.url, src)).path
+ except ValueError:
+ # This happens if the contents of src isn't something that looks like a URL to Python
+ pass
+ if (script_path == "/common/reftest-wait.js" and
+ "reftest-wait" not in source_file.root.attrib.get("class", "").split()):
+ errors.append(rules.MissingReftestWait.error(path))
+
+ return errors
+
+class ASTCheck(metaclass=abc.ABCMeta):
+ @abc.abstractproperty
+ def rule(self) -> Type[rules.Rule]:
+ pass
+
+ @abc.abstractmethod
+ def check(self, root: ast.AST) -> List[int]:
+ pass
+
+class OpenModeCheck(ASTCheck):
+ rule = rules.OpenNoMode
+
+ def check(self, root: ast.AST) -> List[int]:
+ errors = []
+ for node in ast.walk(root):
+ if isinstance(node, ast.Call):
+ if hasattr(node.func, "id") and node.func.id in ("open", "file"):
+ if (len(node.args) < 2 and
+ all(item.arg != "mode" for item in node.keywords)):
+ errors.append(node.lineno)
+ return errors
+
+ast_checkers = [item() for item in [OpenModeCheck]]
+
+def check_python_ast(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
+ if not path.endswith(".py"):
+ return []
+
+ try:
+ root = ast.parse(f.read())
+ except SyntaxError as e:
+ return [rules.ParseFailed.error(path, line_no=e.lineno)]
+
+ errors = []
+ for checker in ast_checkers:
+ for lineno in checker.check(root):
+ errors.append(checker.rule.error(path, line_no=lineno))
+ return errors
+
+
+broken_js_metadata = re.compile(br"//\s*META:")
+broken_python_metadata = re.compile(br"#\s*META:")
+
+
+def check_global_metadata(value: bytes) -> Iterable[Tuple[Type[rules.Rule], Tuple[Any, ...]]]:
+ global_values = {item.strip().decode("utf8") for item in value.split(b",") if item.strip()}
+
+ # TODO: this could check for duplicates and such
+ for global_value in global_values:
+ if not get_any_variants(global_value):
+ yield (rules.UnknownGlobalMetadata, ())
+
+
+def check_script_metadata(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
+ if path.endswith((".worker.js", ".any.js")):
+ meta_re = js_meta_re
+ broken_metadata = broken_js_metadata
+ elif path.endswith(".py"):
+ meta_re = python_meta_re
+ broken_metadata = broken_python_metadata
+ else:
+ return []
+
+ done = False
+ errors = []
+ for idx, line in enumerate(f):
+ assert isinstance(line, bytes), line
+
+ m = meta_re.match(line)
+ if m:
+ key, value = m.groups()
+ if key == b"global":
+ for rule_class, context in check_global_metadata(value):
+ errors.append(rule_class.error(path, context, idx + 1))
+ elif key == b"timeout":
+ if value != b"long":
+ errors.append(rules.UnknownTimeoutMetadata.error(path,
+ line_no=idx + 1))
+ elif key not in (b"title", b"script", b"variant", b"quic"):
+ errors.append(rules.UnknownMetadata.error(path,
+ line_no=idx + 1))
+ else:
+ done = True
+
+ if done:
+ if meta_re.match(line):
+ errors.append(rules.StrayMetadata.error(path, line_no=idx + 1))
+ elif meta_re.search(line):
+ errors.append(rules.IndentedMetadata.error(path,
+ line_no=idx + 1))
+ elif broken_metadata.search(line):
+ errors.append(rules.BrokenMetadata.error(path, line_no=idx + 1))
+
+ return errors
+
+
+ahem_font_re = re.compile(br"font.*:.*ahem", flags=re.IGNORECASE)
+# Ahem can appear either in the global location or in the support
+# directory for legacy Mozilla imports
+ahem_stylesheet_re = re.compile(br"\/fonts\/ahem\.css|support\/ahem.css",
+ flags=re.IGNORECASE)
+
+
+def check_ahem_system_font(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
+ if not path.endswith((".html", ".htm", ".xht", ".xhtml")):
+ return []
+ contents = f.read()
+ errors = []
+ if ahem_font_re.search(contents) and not ahem_stylesheet_re.search(contents):
+ errors.append(rules.AhemSystemFont.error(path))
+ return errors
+
+
+def check_path(repo_root: Text, path: Text) -> List[rules.Error]:
+ """
+ Runs lints that check the file path.
+
+ :param repo_root: the repository root
+ :param path: the path of the file within the repository
+ :returns: a list of errors found in ``path``
+ """
+
+ errors = []
+ for path_fn in path_lints:
+ errors.extend(path_fn(repo_root, path))
+ return errors
+
+
+def check_all_paths(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
+ """
+ Runs lints that check all paths globally.
+
+ :param repo_root: the repository root
+ :param paths: a list of all the paths within the repository
+ :returns: a list of errors found in ``f``
+ """
+
+ errors = []
+ for paths_fn in all_paths_lints:
+ errors.extend(paths_fn(repo_root, paths))
+ return errors
+
+
+def check_file_contents(repo_root: Text, path: Text, f: Optional[IO[bytes]] = None) -> List[rules.Error]:
+ """
+ Runs lints that check the file contents.
+
+ :param repo_root: the repository root
+ :param path: the path of the file within the repository
+ :param f: a file-like object with the file contents
+ :returns: a list of errors found in ``f``
+ """
+ if f is None:
+ f = open(os.path.join(repo_root, path), 'rb')
+ with f:
+ errors = []
+ for file_fn in file_lints:
+ errors.extend(file_fn(repo_root, path, f))
+ f.seek(0)
+ return errors
+
+
+def check_file_contents_apply(args: Tuple[Text, Text]) -> List[rules.Error]:
+ return check_file_contents(*args)
+
+
+def output_errors_text(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
+ for error_type, description, path, line_number in errors:
+ pos_string = path
+ if line_number:
+ pos_string += ":%s" % line_number
+ log(f"{pos_string}: {description} ({error_type})")
+
+
+def output_errors_markdown(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
+ if not errors:
+ return
+ heading = """Got lint errors:
+
+| Error Type | Position | Message |
+|------------|----------|---------|"""
+ for line in heading.split("\n"):
+ log(line)
+ for error_type, description, path, line_number in errors:
+ pos_string = path
+ if line_number:
+ pos_string += ":%s" % line_number
+ log(f"{error_type} | {pos_string} | {description} |")
+
+
+def output_errors_json(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
+ for error_type, error, path, line_number in errors:
+ # We use 'print' rather than the log function to ensure that the output
+ # is valid JSON (e.g. with no logger preamble).
+ print(json.dumps({"path": path, "lineno": line_number,
+ "rule": error_type, "message": error}))
+
+
+def output_errors_github_checks(outputter: GitHubChecksOutputter, errors: List[rules.Error], first_reported: bool) -> None:
+ """Output errors to the GitHub Checks output markdown format.
+
+ :param outputter: the GitHub Checks outputter
+ :param errors: a list of error tuples (error type, message, path, line number)
+ :param first_reported: True if these are the first reported errors
+ """
+ if first_reported:
+ outputter.output(
+ "\nChanges in this PR contain lint errors, listed below. These "
+ "errors must either be fixed or added to the list of ignored "
+ "errors; see [the documentation]("
+ "https://web-platform-tests.org/writing-tests/lint-tool.html). "
+ "For help, please tag `@web-platform-tests/wpt-core-team` in a "
+ "comment.\n")
+ outputter.output("```")
+ output_errors_text(outputter.output, errors)
+
+
+def output_error_count(error_count: Dict[Text, int]) -> None:
+ if not error_count:
+ return
+
+ assert logger is not None
+ by_type = " ".join("%s: %d" % item for item in error_count.items())
+ count = sum(error_count.values())
+ logger.info("")
+ if count == 1:
+ logger.info(f"There was 1 error ({by_type})")
+ else:
+ logger.info("There were %d errors (%s)" % (count, by_type))
+
+
+def changed_files(wpt_root: Text) -> List[Text]:
+ revish = testfiles.get_revish(revish=None)
+ changed, _ = testfiles.files_changed(revish, None, include_uncommitted=True, include_new=True)
+ return [os.path.relpath(item, wpt_root) for item in changed]
+
+
+def lint_paths(kwargs: Dict[Text, Any], wpt_root: Text) -> List[Text]:
+ if kwargs.get("paths"):
+ paths = []
+ for path in kwargs.get("paths", []):
+ if os.path.isdir(path):
+ path_dir = list(all_filesystem_paths(wpt_root, path))
+ paths.extend(path_dir)
+ elif os.path.isfile(path):
+ paths.append(os.path.relpath(os.path.abspath(path), wpt_root))
+ elif kwargs["all"]:
+ paths = list(all_filesystem_paths(wpt_root))
+ elif kwargs["paths_file"]:
+ paths = []
+ with open(kwargs["paths_file"], 'r', newline='') as f:
+ for line in f.readlines():
+ path = line.strip()
+ if os.path.isdir(path):
+ path_dir = list(all_filesystem_paths(wpt_root, path))
+ paths.extend(path_dir)
+ elif os.path.isfile(path):
+ paths.append(os.path.relpath(os.path.abspath(path), wpt_root))
+ else:
+ changed_paths = changed_files(wpt_root)
+ force_all = False
+ for path in changed_paths:
+ path = path.replace(os.path.sep, "/")
+ if path == "lint.ignore" or path.startswith("tools/lint/"):
+ force_all = True
+ break
+ paths = (list(changed_paths) if not force_all
+ else list(all_filesystem_paths(wpt_root)))
+
+ return paths
+
+
+def create_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("paths", nargs="*",
+ help="List of paths to lint")
+ parser.add_argument("--json", action="store_true",
+ help="Output machine-readable JSON format")
+ parser.add_argument("--markdown", action="store_true",
+ help="Output markdown")
+ parser.add_argument("--repo-root", type=str,
+ help="The WPT directory. Use this "
+ "option if the lint script exists outside the repository")
+ parser.add_argument("--ignore-glob", type=str, action="append",
+ help="Additional file glob to ignore (repeat to add more). "
+ "Globs are matched against paths relative to REPO_ROOT "
+ "using fnmatch, except that path separators are normalized.")
+ parser.add_argument("--all", action="store_true", help="If no paths are passed, try to lint the whole "
+ "working directory, not just files that changed")
+ parser.add_argument("--github-checks-text-file", type=str,
+ help="Path to GitHub checks output file for Taskcluster runs")
+ parser.add_argument("-j", "--jobs", type=int, default=0,
+ help="Level to parallelism to use (defaults to 0, which detects the number of CPUs)")
+ parser.add_argument("--paths-file", help="File containing a list of files to lint, one per line")
+ return parser
+
+
+def main(**kwargs: Any) -> int:
+
+ assert logger is not None
+ if kwargs.get("json") and kwargs.get("markdown"):
+ logger.critical("Cannot specify --json and --markdown")
+ sys.exit(2)
+
+ repo_root = kwargs.get('repo_root') or localpaths.repo_root
+ output_format = {(True, False): "json",
+ (False, True): "markdown",
+ (False, False): "normal"}[(kwargs.get("json", False),
+ kwargs.get("markdown", False))]
+
+ if output_format == "markdown":
+ setup_logging(True)
+
+ paths = lint_paths(kwargs, repo_root)
+
+ ignore_glob = kwargs.get("ignore_glob", [])
+
+ github_checks_outputter = get_gh_checks_outputter(kwargs["github_checks_text_file"])
+
+ jobs = kwargs.get("jobs", 0)
+
+ return lint(repo_root, paths, output_format, ignore_glob, github_checks_outputter, jobs)
+
+
+# best experimental guess at a decent cut-off for using the parallel path
+MIN_FILES_FOR_PARALLEL = 80
+
+
+def lint(repo_root: Text,
+ paths: List[Text],
+ output_format: Text,
+ ignore_glob: Optional[List[Text]] = None,
+ github_checks_outputter: Optional[GitHubChecksOutputter] = None,
+ jobs: int = 0) -> int:
+ error_count: Dict[Text, int] = defaultdict(int)
+ last = None
+
+ if jobs == 0:
+ jobs = multiprocessing.cpu_count()
+ if sys.platform == 'win32':
+ # Using too many child processes in Python 3 hits either hangs or a
+ # ValueError exception, and, has diminishing returns. Clamp to 56 to
+ # give margin for error.
+ jobs = min(jobs, 56)
+
+ with open(os.path.join(repo_root, "lint.ignore")) as f:
+ ignorelist, skipped_files = parse_ignorelist(f)
+
+ if ignore_glob:
+ skipped_files |= set(ignore_glob)
+
+ output_errors = {"json": output_errors_json,
+ "markdown": output_errors_markdown,
+ "normal": output_errors_text}[output_format]
+
+ def process_errors(errors: List[rules.Error]) -> Optional[Tuple[Text, Text]]:
+ """
+ Filters and prints the errors, and updates the ``error_count`` object.
+
+ :param errors: a list of error tuples (error type, message, path, line number)
+ :returns: ``None`` if there were no errors, or
+ a tuple of the error type and the path otherwise
+ """
+
+ errors = filter_ignorelist_errors(ignorelist, errors)
+ if not errors:
+ return None
+
+ assert logger is not None
+ output_errors(logger.error, errors)
+
+ if github_checks_outputter:
+ first_output = len(error_count) == 0
+ output_errors_github_checks(github_checks_outputter, errors, first_output)
+
+ for error_type, error, path, line in errors:
+ error_count[error_type] += 1
+
+ return (errors[-1][0], path)
+
+ to_check_content = []
+ skip = set()
+
+ for path in paths:
+ abs_path = os.path.join(repo_root, path)
+ if not os.path.exists(abs_path):
+ skip.add(path)
+ continue
+
+ if any(fnmatch.fnmatch(path, file_match) for file_match in skipped_files):
+ skip.add(path)
+ continue
+
+ errors = check_path(repo_root, path)
+ last = process_errors(errors) or last
+
+ if not os.path.isdir(abs_path):
+ to_check_content.append((repo_root, path))
+
+ paths = [p for p in paths if p not in skip]
+
+ if jobs > 1 and len(to_check_content) >= MIN_FILES_FOR_PARALLEL:
+ pool = multiprocessing.Pool(jobs)
+ # submit this job first, as it's the longest running
+ all_paths_result = pool.apply_async(check_all_paths, (repo_root, paths))
+ # each item tends to be quick, so pass things in large chunks to avoid too much IPC overhead
+ errors_it = pool.imap_unordered(check_file_contents_apply, to_check_content, chunksize=40)
+ pool.close()
+ for errors in errors_it:
+ last = process_errors(errors) or last
+
+ errors = all_paths_result.get()
+ pool.join()
+ last = process_errors(errors) or last
+ else:
+ for item in to_check_content:
+ errors = check_file_contents(*item)
+ last = process_errors(errors) or last
+
+ errors = check_all_paths(repo_root, paths)
+ last = process_errors(errors) or last
+
+ if output_format in ("normal", "markdown"):
+ output_error_count(error_count)
+ if error_count:
+ assert last is not None
+ assert logger is not None
+ for line in (ERROR_MSG % (last[0], last[1], last[0], last[1])).split("\n"):
+ logger.info(line)
+
+ if error_count and github_checks_outputter:
+ github_checks_outputter.output("```")
+
+ return sum(error_count.values())
+
+
+path_lints = [check_file_type, check_path_length, check_worker_collision, check_ahem_copy,
+ check_mojom_js, check_tentative_directories, check_gitignore_file]
+all_paths_lints = [check_unique_testharness_basenames,
+ check_unique_case_insensitive_paths]
+file_lints = [check_regexp_line, check_parsed, check_python_ast, check_script_metadata,
+ check_ahem_system_font]
+
+# Don't break users of the lint that don't have git installed.
+try:
+ subprocess.check_output(["git", "--version"])
+ all_paths_lints += [check_git_ignore]
+except (subprocess.CalledProcessError, FileNotFoundError):
+ print('No git present; skipping .gitignore lint.')
+
+if __name__ == "__main__":
+ args = create_parser().parse_args()
+ error_count = main(**vars(args))
+ if error_count > 0:
+ sys.exit(1)