diff options
Diffstat (limited to 'testing/webcompat/mach_commands.py')
-rw-r--r-- | testing/webcompat/mach_commands.py | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/testing/webcompat/mach_commands.py b/testing/webcompat/mach_commands.py new file mode 100644 index 0000000000..89b0f76f2a --- /dev/null +++ b/testing/webcompat/mach_commands.py @@ -0,0 +1,247 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import argparse +import io +import os +import platform +import sys + +from mach.decorators import Command +from mozbuild.base import MozbuildObject + +here = os.path.abspath(os.path.dirname(__file__)) + + +def get(url): + import requests + + resp = requests.get(url) + resp.raise_for_status() + return resp + + +def untar(fileobj, dest): + import tarfile + + with tarfile.open(fileobj=fileobj, mode="r") as tar_data: + tar_data.extractall(path=dest) + + +def unzip(fileobj, dest): + import zipfile + + with zipfile.ZipFile(fileobj) as zip_data: + zip_data.extractall(path=dest) + + +def create_parser_interventions(): + from mozlog import commandline + + parser = argparse.ArgumentParser() + parser.add_argument("--binary", help="Path to browser binary") + parser.add_argument("--webdriver-binary", help="Path to webdriver binary") + parser.add_argument( + "--webdriver-port", + action="store", + default="4444", + help="Port on which to run WebDriver", + ) + parser.add_argument( + "--webdriver-ws-port", + action="store", + default="9222", + help="Port on which to run WebDriver BiDi websocket", + ) + parser.add_argument("--bug", help="Bug to run tests for") + parser.add_argument( + "--do2fa", + action="store_true", + default=False, + help="Do two-factor auth live in supporting tests", + ) + parser.add_argument( + "--config", help="Path to JSON file containing logins and other settings" + ) + parser.add_argument( + "--debug", action="store_true", default=False, help="Debug failing tests" + ) + parser.add_argument( + "--headless", + action="store_true", + default=False, + help="Run firefox in headless mode", + ) + parser.add_argument( + "--interventions", + action="store", + default="both", + choices=["enabled", "disabled", "both", "none"], + help="Enable webcompat interventions", + ) + parser.add_argument( + "--shims", + action="store", + default="none", + choices=["enabled", "disabled", "both", "none"], + help="Enable SmartBlock shims", + ) + commandline.add_logging_group(parser) + return parser + + +class InterventionTest(MozbuildObject): + def set_default_kwargs(self, logger, kwargs): + if kwargs["binary"] is None: + kwargs["binary"] = self.get_binary_path() + + if kwargs["webdriver_binary"] is None: + webdriver_binary = self.get_binary_path( + "geckodriver", validate_exists=False + ) + + if not os.path.exists(webdriver_binary): + webdriver_binary = self.install_geckodriver( + logger, dest=os.path.dirname(webdriver_binary) + ) + + if not os.path.exists(webdriver_binary): + logger.error("Can't find geckodriver") + sys.exit(1) + kwargs["webdriver_binary"] = webdriver_binary + + def get_capabilities(self, kwargs): + return {"moz:firefoxOptions": {"binary": kwargs["binary"]}} + + def platform_string_geckodriver(self): + uname = platform.uname() + platform_name = {"Linux": "linux", "Windows": "win", "Darwin": "macos"}.get( + uname[0] + ) + + if platform_name in ("linux", "win"): + bits = "64" if uname[4] == "x86_64" else "32" + elif platform_name == "macos": + bits = "" + else: + raise ValueError(f"No precompiled geckodriver for platform {uname}") + + return f"{platform_name}{bits}" + + def install_geckodriver(self, logger, dest): + """Install latest Geckodriver.""" + if dest is None: + dest = os.path.join(self.distdir, "dist", "bin") + + is_windows = platform.uname()[0] == "Windows" + + release = get( + "https://api.github.com/repos/mozilla/geckodriver/releases/latest" + ).json() + ext = "zip" if is_windows else "tar.gz" + platform_name = self.platform_string_geckodriver() + name_suffix = f"-{platform_name}.{ext}" + for item in release["assets"]: + if item["name"].endswith(name_suffix): + url = item["browser_download_url"] + break + else: + raise ValueError(f"Failed to find geckodriver for platform {platform_name}") + + logger.info(f"Installing geckodriver from {url}") + + data = io.BytesIO(get(url).content) + data.seek(0) + decompress = unzip if ext == "zip" else untar + decompress(data, dest=dest) + + exe_ext = ".exe" if is_windows else "" + path = os.path.join(dest, f"geckodriver{exe_ext}") + + return path + + def run(self, **kwargs): + import mozlog + import runner + + mozlog.commandline.setup_logging( + "test-interventions", kwargs, {"mach": sys.stdout} + ) + logger = mozlog.get_default_logger("test-interventions") + status_handler = mozlog.handlers.StatusHandler() + logger.add_handler(status_handler) + + self.set_default_kwargs(logger, kwargs) + + if kwargs["interventions"] != "none": + interventions = ( + ["enabled", "disabled"] + if kwargs["interventions"] == "both" + else [kwargs["interventions"]] + ) + + for interventions_setting in interventions: + runner.run( + logger, + os.path.join(here, "interventions"), + kwargs["binary"], + kwargs["webdriver_binary"], + kwargs["webdriver_port"], + kwargs["webdriver_ws_port"], + bug=kwargs["bug"], + debug=kwargs["debug"], + interventions=interventions_setting, + config=kwargs["config"], + headless=kwargs["headless"], + do2fa=kwargs["do2fa"], + ) + + if kwargs["shims"] != "none": + shims = ( + ["enabled", "disabled"] + if kwargs["shims"] == "both" + else [kwargs["shims"]] + ) + + for shims_setting in shims: + runner.run( + logger, + os.path.join(here, "shims"), + kwargs["binary"], + kwargs["webdriver_binary"], + kwargs["webdriver_port"], + kwargs["webdriver_ws_port"], + bug=kwargs["bug"], + debug=kwargs["debug"], + shims=shims_setting, + config=kwargs["config"], + headless=kwargs["headless"], + do2fa=kwargs["do2fa"], + ) + + summary = status_handler.summarize() + passed = ( + summary.unexpected_statuses == 0 + and summary.log_level_counts.get("ERROR", 0) == 0 + and summary.log_level_counts.get("CRITICAL", 0) == 0 + ) + return passed + + +@Command( + "test-interventions", + category="testing", + description="Test the webcompat interventions", + parser=create_parser_interventions, + virtualenv_name="webcompat", +) +def test_interventions(command_context, **params): + here = os.path.abspath(os.path.dirname(__file__)) + command_context.virtualenv_manager.activate() + command_context.virtualenv_manager.install_pip_requirements( + os.path.join(here, "requirements.txt"), + require_hashes=False, + ) + intervention_test = command_context._spawn(InterventionTest) + return 0 if intervention_test.run(**params) else 1 |