diff options
Diffstat (limited to 'testing/web-platform/mozilla/tests/webdriver/support/fixtures.py')
-rw-r--r-- | testing/web-platform/mozilla/tests/webdriver/support/fixtures.py | 367 |
1 files changed, 50 insertions, 317 deletions
diff --git a/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py b/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py index e9dbf1cdfe..b788d874e5 100644 --- a/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py +++ b/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py @@ -1,40 +1,18 @@ -import argparse -import json -import os -import re -import socket -import subprocess -import threading -import time -from contextlib import suppress -from urllib.parse import urlparse - import pytest -import webdriver -from mozprofile import Preferences, Profile -from mozrunner import FirefoxRunner - -from .context import using_context - - -def get_arg_value(arg_names, args): - """Get an argument value from a list of arguments - This assumes that argparse argument parsing is close enough to the target - to be compatible, at least with the set of inputs we have. - - :param arg_names: - List of names for the argument e.g. ["--foo", "-f"] - :param args: - List of arguments to parse - :returns: - Optional string argument value - """ - parser = argparse.ArgumentParser() - parser.add_argument(*arg_names, action="store", dest="value", default=None) - parsed, _ = parser.parse_known_args(args) - return parsed.value +from .helpers import ( + Browser, + Geckodriver, + create_custom_profile, + get_pref, + get_profile_folder, + read_user_preferences, + set_pref, +) @pytest.fixture(scope="module") -def browser(full_configuration): +def browser(configuration, firefox_options): """Start a Firefox instance without using geckodriver. geckodriver will automatically use the --remote-allow-hosts and @@ -46,7 +24,13 @@ def browser(full_configuration): """ current_browser = None - def _browser(use_bidi=False, use_cdp=False, extra_args=None, extra_prefs=None): + def _browser( + use_bidi=False, + use_cdp=False, + extra_args=None, + extra_prefs=None, + clone_profile=True, + ): nonlocal current_browser # If the requested preferences and arguments match the ones for the @@ -66,12 +50,18 @@ def browser(full_configuration): # to create a new instance for the provided preferences. current_browser.quit() - binary = full_configuration["browser"]["binary"] - env = full_configuration["browser"]["env"] - firefox_options = full_configuration["capabilities"]["moz:firefoxOptions"] + binary = configuration["browser"]["binary"] + env = configuration["browser"]["env"] + + profile_path = get_profile_folder(firefox_options) + default_prefs = read_user_preferences(profile_path) + profile = create_custom_profile( + profile_path, default_prefs, clone=clone_profile + ) + current_browser = Browser( binary, - firefox_options, + profile, use_bidi=use_bidi, use_cdp=use_cdp, extra_args=extra_args, @@ -89,20 +79,32 @@ def browser(full_configuration): current_browser = None -@pytest.fixture -def profile_folder(configuration): - firefox_options = configuration["capabilities"]["moz:firefoxOptions"] - return get_arg_value(["--profile"], firefox_options["args"]) +@pytest.fixture(name="create_custom_profile") +def fixture_create_custom_profile(default_preferences, profile_folder): + profile = None + + def _create_custom_profile(clone=True): + profile = create_custom_profile( + profile_folder, default_preferences, clone=clone + ) + + return profile + + yield _create_custom_profile + + # if profile is not None: + if profile: + profile.cleanup() @pytest.fixture -def custom_profile(profile_folder): - # Clone the known profile for automation preferences - profile = Profile.clone(profile_folder) +def default_preferences(profile_folder): + return read_user_preferences(profile_folder) - yield profile - profile.cleanup() +@pytest.fixture(scope="session") +def firefox_options(configuration): + return configuration["capabilities"]["moz:firefoxOptions"] @pytest.fixture @@ -128,277 +130,8 @@ def geckodriver(configuration): @pytest.fixture -def user_prefs(profile_folder): - user_js = os.path.join(profile_folder, "user.js") - - prefs = {} - for pref_name, pref_value in Preferences().read_prefs(user_js): - prefs[pref_name] = pref_value - - return prefs - - -class Browser: - def __init__( - self, - binary, - firefox_options, - use_bidi=False, - use_cdp=False, - extra_args=None, - extra_prefs=None, - env=None, - ): - self.use_bidi = use_bidi - self.bidi_port_file = None - self.use_cdp = use_cdp - self.cdp_port_file = None - self.extra_args = extra_args - self.extra_prefs = extra_prefs - - self.debugger_address = None - self.remote_agent_host = None - self.remote_agent_port = None - - # Prepare temporary profile - _profile_arg, profile_folder = firefox_options["args"] - self.profile = Profile.clone(profile_folder) - if self.extra_prefs is not None: - self.profile.set_preferences(self.extra_prefs) - - if use_cdp: - self.cdp_port_file = os.path.join( - self.profile.profile, "DevToolsActivePort" - ) - with suppress(FileNotFoundError): - os.remove(self.cdp_port_file) - if use_bidi: - self.webdriver_bidi_file = os.path.join( - self.profile.profile, "WebDriverBiDiServer.json" - ) - with suppress(FileNotFoundError): - os.remove(self.webdriver_bidi_file) - - cmdargs = ["-no-remote"] - if self.use_bidi or self.use_cdp: - cmdargs.extend(["--remote-debugging-port", "0"]) - if self.extra_args is not None: - cmdargs.extend(self.extra_args) - self.runner = FirefoxRunner( - binary=binary, profile=self.profile, cmdargs=cmdargs, env=env - ) - - @property - def is_running(self): - return self.runner.is_running() - - def start(self): - # Start Firefox. - self.runner.start() - - if self.use_bidi: - # Wait until the WebDriverBiDiServer.json file is ready - while not os.path.exists(self.webdriver_bidi_file): - time.sleep(0.1) - - # Read the connection details from file - data = json.loads(open(self.webdriver_bidi_file).read()) - self.remote_agent_host = data["ws_host"] - self.remote_agent_port = int(data["ws_port"]) - - if self.use_cdp: - # Wait until the DevToolsActivePort file is ready - while not os.path.exists(self.cdp_port_file): - time.sleep(0.1) - - # Read the port if needed and the debugger address from the - # DevToolsActivePort file - lines = open(self.cdp_port_file).readlines() - assert len(lines) == 2 - - if self.remote_agent_port is None: - self.remote_agent_port = int(lines[0].strip()) - self.debugger_address = lines[1].strip() - - def quit(self, clean_profile=True): - if self.is_running: - self.runner.stop() - self.runner.cleanup() - - if clean_profile: - self.profile.cleanup() - - def wait(self): - if self.is_running is True: - self.runner.wait() - - -class Geckodriver: - PORT_RE = re.compile(b".*Listening on [^ :]*:(\d+)") - - def __init__(self, configuration, hostname=None, extra_args=None): - self.config = configuration["webdriver"] - self.requested_capabilities = configuration["capabilities"] - self.hostname = hostname or configuration["host"] - self.extra_args = extra_args or [] - self.env = configuration["browser"]["env"] - - self.command = None - self.proc = None - self.port = None - self.reader_thread = None - - self.capabilities = {"alwaysMatch": self.requested_capabilities} - self.session = None - - @property - def remote_agent_port(self): - webSocketUrl = self.session.capabilities.get("webSocketUrl") - assert webSocketUrl is not None - - return urlparse(webSocketUrl).port - - def start(self): - self.command = ( - [self.config["binary"], "--port", "0"] - + self.config["args"] - + self.extra_args - ) - - print(f"Running command: {' '.join(self.command)}") - self.proc = subprocess.Popen(self.command, env=self.env, stdout=subprocess.PIPE) - - self.reader_thread = threading.Thread( - target=readOutputLine, - args=(self.proc.stdout, self.processOutputLine), - daemon=True, - ) - self.reader_thread.start() - # Wait for the port to become ready - end_time = time.time() + 10 - while time.time() < end_time: - returncode = self.proc.poll() - if returncode is not None: - raise ChildProcessError( - f"geckodriver terminated with code {returncode}" - ) - if self.port is not None: - with socket.socket() as sock: - if sock.connect_ex((self.hostname, self.port)) == 0: - break - else: - time.sleep(0.1) - else: - if self.port is None: - raise OSError( - f"Failed to read geckodriver port started on {self.hostname}" - ) - raise ConnectionRefusedError( - f"Failed to connect to geckodriver on {self.hostname}:{self.port}" - ) - - self.session = webdriver.Session( - self.hostname, self.port, capabilities=self.capabilities - ) - - return self - - def processOutputLine(self, line): - if self.port is None: - m = self.PORT_RE.match(line) - if m is not None: - self.port = int(m.groups()[0]) - - def stop(self): - if self.session is not None: - self.delete_session() - if self.proc: - self.proc.kill() - self.port = None - if self.reader_thread is not None: - self.reader_thread.join() - - def new_session(self): - self.session.start() - - def delete_session(self): - self.session.end() - - -def readOutputLine(stream, callback): - while True: - line = stream.readline() - if not line: - break - - callback(line) - - -def clear_pref(session, pref): - """Clear the user-defined value from the specified preference. - - :param pref: Name of the preference. - """ - with using_context(session, "chrome"): - session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - Preferences.reset(arguments[0]); - """, - args=(pref,), - ) - - -def get_pref(session, pref): - """Get the value of the specified preference. - - :param pref: Name of the preference. - """ - with using_context(session, "chrome"): - pref_value = session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - - let pref = arguments[0]; - - prefs = new Preferences(); - return prefs.get(pref, null); - """, - args=(pref,), - ) - return pref_value - - -def set_pref(session, pref, value): - """Set the value of the specified preference. - - :param pref: Name of the preference. - :param value: The value to set the preference to. If the value is None, - reset the preference to its default value. If no default - value exists, the preference will cease to exist. - """ - if value is None: - clear_pref(session, pref) - return - - with using_context(session, "chrome"): - session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - - const [pref, value] = arguments; - - prefs = new Preferences(); - prefs.set(pref, value); - """, - args=(pref, value), - ) +def profile_folder(firefox_options): + return get_profile_folder(firefox_options) @pytest.fixture |