# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import argparse import io import os import platform import sys from mach.decorators import Command from mozbuild.base import BuildEnvironmentNotFoundException, MozbuildObject here = os.path.abspath(os.path.dirname(__file__)) GVE = "org.mozilla.geckoview_example" def get(url): import requests resp = requests.get(url) resp.raise_for_status() return resp def untar(fileobj, dest): import tarfile with tarfile.open(fileobj=fileobj, mode="r") as tar_data: tar_data.extractall(path=dest) def unzip(fileobj, dest): import zipfile with zipfile.ZipFile(fileobj) as zip_data: zip_data.extractall(path=dest) def writable_dir(path): if not os.path.isdir(path): raise argparse.ArgumentTypeError(f"{path} is not a valid dir") if os.access(path, os.W_OK): return path else: raise argparse.ArgumentTypeError(f"{path} is not a writable dir") def create_parser_interventions(): from mozlog import commandline parser = argparse.ArgumentParser() parser.add_argument("--webdriver-binary", help="Path to webdriver binary") parser.add_argument( "--webdriver-port", action="store", default="4444", help="Port on which to run WebDriver", ) parser.add_argument( "--webdriver-ws-port", action="store", default="9222", help="Port on which to run WebDriver BiDi websocket", ) parser.add_argument("-b", "--bugs", nargs="*", help="Bugs to run tests for") parser.add_argument( "--do2fa", action="store_true", default=False, help="Do two-factor auth live in supporting tests", ) parser.add_argument( "--config", help="Path to JSON file containing logins and other settings" ) parser.add_argument( "--debug", action="store_true", default=False, help="Debug failing tests" ) parser.add_argument( "-H", "--headless", action="store_true", default=False, help="Run firefox in headless mode", ) parser.add_argument( "--interventions", action="store", default="both", choices=["enabled", "disabled", "both", "none"], help="Enable webcompat interventions", ) parser.add_argument( "--shims", action="store", default="none", choices=["enabled", "disabled", "both", "none"], help="Enable SmartBlock shims", ) parser.add_argument( "--platform", action="store", choices=["android", "desktop"], help="Platform to target", ) parser.add_argument( "--failure-screenshots-dir", action="store", type=writable_dir, help="Path to save failure screenshots", ) parser.add_argument( "-s", "--no-failure-screenshots", action="store_true", default=False, help="Do not save a screenshot for each test failure", ) desktop_group = parser.add_argument_group("Desktop-specific arguments") desktop_group.add_argument("--binary", help="Path to browser binary") android_group = parser.add_argument_group("Android-specific arguments") android_group.add_argument( "--device-serial", action="store", help="Running Android instances to connect to, if not emulator-5554", ) android_group.add_argument( "--package-name", action="store", default=GVE, help="Android package name to use", ) commandline.add_logging_group(parser) return parser class InterventionTest(MozbuildObject): def set_default_kwargs(self, logger, command_context, kwargs): platform = kwargs["platform"] binary = kwargs["binary"] device_serial = kwargs["device_serial"] try: is_gve_build = command_context.substs.get("MOZ_APP_NAME") == "fennec" except BuildEnvironmentNotFoundException: # If we don't have a build, just use the logic below to choose between # desktop and Android is_gve_build = False if platform == "android" or ( platform is None and binary is None and (device_serial or is_gve_build) ): kwargs["platform"] = "android" else: kwargs["platform"] = "desktop" if kwargs["platform"] == "desktop" and kwargs["binary"] is None: kwargs["binary"] = self.get_binary_path() if kwargs["webdriver_binary"] is None: webdriver_binary = self.get_binary_path( "geckodriver", validate_exists=False ) if not os.path.exists(webdriver_binary): webdriver_binary = self.install_geckodriver( logger, dest=os.path.dirname(webdriver_binary) ) if not os.path.exists(webdriver_binary): logger.error("Can't find geckodriver") sys.exit(1) kwargs["webdriver_binary"] = webdriver_binary def platform_string_geckodriver(self): uname = platform.uname() platform_name = {"Linux": "linux", "Windows": "win", "Darwin": "macos"}.get( uname[0] ) if platform_name in ("linux", "win"): bits = "64" if uname[4] == "x86_64" else "32" elif platform_name == "macos": bits = "" else: raise ValueError(f"No precompiled geckodriver for platform {uname}") return f"{platform_name}{bits}" def install_geckodriver(self, logger, dest): """Install latest Geckodriver.""" if dest is None: dest = os.path.join(self.distdir, "dist", "bin") is_windows = platform.uname()[0] == "Windows" release = get( "https://api.github.com/repos/mozilla/geckodriver/releases/latest" ).json() ext = "zip" if is_windows else "tar.gz" platform_name = self.platform_string_geckodriver() name_suffix = f"-{platform_name}.{ext}" for item in release["assets"]: if item["name"].endswith(name_suffix): url = item["browser_download_url"] break else: raise ValueError(f"Failed to find geckodriver for platform {platform_name}") logger.info(f"Installing geckodriver from {url}") data = io.BytesIO(get(url).content) data.seek(0) decompress = unzip if ext == "zip" else untar decompress(data, dest=dest) exe_ext = ".exe" if is_windows else "" path = os.path.join(dest, f"geckodriver{exe_ext}") return path def setup_device(self, command_context, kwargs): if kwargs["platform"] != "android": return app = kwargs["package_name"] device_serial = kwargs["device_serial"] if not device_serial: from mozrunner.devices.android_device import ( InstallIntent, verify_android_device, ) # verify_android_device sets up device/emulator and records selected # one to DEVICE_SERIAL environment. verify_android_device( command_context, app=app, network=True, install=InstallIntent.YES ) kwargs["device_serial"] = os.environ.get("DEVICE_SERIAL") # GVE does not have the webcompat addon by default. Add it. if app == GVE: kwargs["addon"] = "/data/local/tmp/webcompat.xpi" push_to_device( command_context.substs["ADB"], device_serial, webcompat_addon(command_context), kwargs["addon"], ) def run(self, command_context, **kwargs): import mozlog import runner mozlog.commandline.setup_logging( "test-interventions", kwargs, {"mach": sys.stdout} ) logger = mozlog.get_default_logger("test-interventions") log_level = "INFO" # It's not trivial to get a single log level out of mozlog, because we might have # different levels going to different outputs. We look for the maximum (i.e. most # verbose) level of any handler with an attached formatter. configured_level_number = None for handler in logger.handlers: if hasattr(handler, "formatter") and hasattr(handler.formatter, "level"): formatter_level = handler.formatter.level configured_level_number = ( formatter_level if configured_level_number is None else max(configured_level_number, formatter_level) ) if configured_level_number is not None: for level, number in mozlog.structuredlog.log_levels.items(): if number == configured_level_number: log_level = level break status_handler = mozlog.handlers.StatusHandler() logger.add_handler(status_handler) self.set_default_kwargs(logger, command_context, kwargs) self.setup_device(command_context, kwargs) if kwargs["interventions"] != "none": interventions = ( ["enabled", "disabled"] if kwargs["interventions"] == "both" else [kwargs["interventions"]] ) for interventions_setting in interventions: runner.run( logger, os.path.join(here, "interventions"), kwargs["webdriver_binary"], kwargs["webdriver_port"], kwargs["webdriver_ws_port"], browser_binary=kwargs.get("binary"), device_serial=kwargs.get("device_serial"), package_name=kwargs.get("package_name"), addon=kwargs.get("addon"), bugs=kwargs["bugs"], debug=kwargs["debug"], interventions=interventions_setting, config=kwargs["config"], headless=kwargs["headless"], do2fa=kwargs["do2fa"], log_level=log_level, failure_screenshots_dir=kwargs.get("failure_screenshots_dir"), no_failure_screenshots=kwargs.get("no_failure_screenshots"), ) if kwargs["shims"] != "none": shims = ( ["enabled", "disabled"] if kwargs["shims"] == "both" else [kwargs["shims"]] ) for shims_setting in shims: runner.run( logger, os.path.join(here, "shims"), kwargs["webdriver_binary"], kwargs["webdriver_port"], kwargs["webdriver_ws_port"], browser_binary=kwargs.get("binary"), device_serial=kwargs.get("device_serial"), package_name=kwargs.get("package_name"), addon=kwargs.get("addon"), bugs=kwargs["bugs"], debug=kwargs["debug"], shims=shims_setting, config=kwargs["config"], headless=kwargs["headless"], do2fa=kwargs["do2fa"], failure_screenshots_dir=kwargs.get("failure_screenshots_dir"), no_failure_screenshots=kwargs.get("no_failure_screenshots"), ) summary = status_handler.summarize() passed = ( summary.unexpected_statuses == 0 and summary.log_level_counts.get("ERROR", 0) == 0 and summary.log_level_counts.get("CRITICAL", 0) == 0 ) return passed def webcompat_addon(command_context): import shutil import tempfile src = os.path.join(command_context.topsrcdir, "browser", "extensions", "webcompat") # We use #include directives in the system addon's moz.build (to inject our JSON config # into interventions.js), so we must do that here to make a working XPI. tmpdir_kwargs = {} if sys.version_info.major >= 3 and sys.version_info.minor >= 10: tmpdir_kwargs["ignore_cleanup_errors"] = True with tempfile.TemporaryDirectory(**tmpdir_kwargs) as src_copy: def process_includes(path): fullpath = os.path.join(src_copy, path) in_lines = None with open(fullpath) as f: in_lines = f.readlines() with open(fullpath, "w") as f: for line in in_lines: if not line.startswith("#include"): f.write(line) continue include_path = line.split()[1] include_fullpath = os.path.join( os.path.dirname(fullpath), include_path ) with open(include_fullpath) as inc: f.write(inc.read()) f.write("\n") shutil.copytree(src, src_copy, dirs_exist_ok=True) process_includes("run.js") dst = os.path.join( command_context.virtualenv_manager.virtualenv_root, "webcompat.xpi" ) shutil.make_archive(dst, "zip", src_copy) shutil.move(f"{dst}.zip", dst) return dst def push_to_device(adb_path, device_serial, local_path, remote_path): from mozdevice import ADBDeviceFactory device = ADBDeviceFactory(adb=adb_path, device=device_serial) device.push(local_path, remote_path) device.chmod(remote_path) @Command( "test-interventions", category="testing", description="Test the webcompat interventions", parser=create_parser_interventions, virtualenv_name="webcompat", ) def test_interventions(command_context, **params): here = os.path.abspath(os.path.dirname(__file__)) command_context.virtualenv_manager.activate() command_context.virtualenv_manager.install_pip_requirements( os.path.join(here, "requirements.txt"), require_hashes=False, ) intervention_test = command_context._spawn(InterventionTest) return 0 if intervention_test.run(command_context, **params) else 1