summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/mozilla/tests/webdriver/support/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/mozilla/tests/webdriver/support/helpers.py')
-rw-r--r--testing/web-platform/mozilla/tests/webdriver/support/helpers.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/testing/web-platform/mozilla/tests/webdriver/support/helpers.py b/testing/web-platform/mozilla/tests/webdriver/support/helpers.py
new file mode 100644
index 0000000000..6577289983
--- /dev/null
+++ b/testing/web-platform/mozilla/tests/webdriver/support/helpers.py
@@ -0,0 +1,324 @@
+import argparse
+import json
+import os
+import re
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+from contextlib import suppress
+from urllib.parse import urlparse
+
+import webdriver
+from mozprofile import Preferences, Profile
+from mozrunner import FirefoxRunner
+
+from .context import using_context
+
+
+class Browser:
+ def __init__(
+ self,
+ binary,
+ profile,
+ use_bidi=False,
+ use_cdp=False,
+ extra_args=None,
+ extra_prefs=None,
+ env=None,
+ ):
+ self.profile = profile
+ self.use_bidi = use_bidi
+ self.bidi_port_file = None
+ self.use_cdp = use_cdp
+ self.cdp_port_file = None
+ self.extra_args = extra_args
+ self.extra_prefs = extra_prefs
+
+ self.debugger_address = None
+ self.remote_agent_host = None
+ self.remote_agent_port = None
+
+ if self.extra_prefs is not None:
+ self.profile.set_preferences(self.extra_prefs)
+
+ if use_cdp:
+ self.cdp_port_file = os.path.join(
+ self.profile.profile, "DevToolsActivePort"
+ )
+ with suppress(FileNotFoundError):
+ os.remove(self.cdp_port_file)
+
+ if use_bidi:
+ self.webdriver_bidi_file = os.path.join(
+ self.profile.profile, "WebDriverBiDiServer.json"
+ )
+ with suppress(FileNotFoundError):
+ os.remove(self.webdriver_bidi_file)
+
+ cmdargs = ["-no-remote"]
+ if self.use_bidi or self.use_cdp:
+ cmdargs.extend(["--remote-debugging-port", "0"])
+ if self.extra_args is not None:
+ cmdargs.extend(self.extra_args)
+ self.runner = FirefoxRunner(
+ binary=binary, profile=self.profile, cmdargs=cmdargs, env=env
+ )
+
+ @property
+ def is_running(self):
+ return self.runner.is_running()
+
+ def start(self):
+ # Start Firefox.
+ self.runner.start()
+
+ if self.use_bidi:
+ # Wait until the WebDriverBiDiServer.json file is ready
+ while not os.path.exists(self.webdriver_bidi_file):
+ time.sleep(0.1)
+
+ # Read the connection details from file
+ data = json.loads(open(self.webdriver_bidi_file).read())
+ self.remote_agent_host = data["ws_host"]
+ self.remote_agent_port = int(data["ws_port"])
+
+ if self.use_cdp:
+ # Wait until the DevToolsActivePort file is ready
+ while not os.path.exists(self.cdp_port_file):
+ time.sleep(0.1)
+
+ # Read the port if needed and the debugger address from the
+ # DevToolsActivePort file
+ lines = open(self.cdp_port_file).readlines()
+ assert len(lines) == 2
+
+ if self.remote_agent_port is None:
+ self.remote_agent_port = int(lines[0].strip())
+ self.debugger_address = lines[1].strip()
+
+ def quit(self, clean_profile=True):
+ if self.is_running:
+ self.runner.stop()
+ self.runner.cleanup()
+
+ if clean_profile:
+ self.profile.cleanup()
+
+ def wait(self):
+ if self.is_running is True:
+ self.runner.wait()
+
+
+class Geckodriver:
+ PORT_RE = re.compile(rb".*Listening on [^ :]*:(\d+)")
+
+ def __init__(self, configuration, hostname=None, extra_args=None):
+ self.config = configuration["webdriver"]
+ self.requested_capabilities = configuration["capabilities"]
+ self.hostname = hostname or configuration["host"]
+ self.extra_args = extra_args or []
+ self.env = configuration["browser"]["env"]
+
+ self.command = None
+ self.proc = None
+ self.port = None
+ self.reader_thread = None
+
+ self.capabilities = {"alwaysMatch": self.requested_capabilities}
+ self.session = None
+
+ @property
+ def remote_agent_port(self):
+ webSocketUrl = self.session.capabilities.get("webSocketUrl")
+ assert webSocketUrl is not None
+
+ return urlparse(webSocketUrl).port
+
+ def start(self):
+ self.command = (
+ [self.config["binary"], "--port", "0"]
+ + self.config["args"]
+ + self.extra_args
+ )
+
+ print(f"Running command: {' '.join(self.command)}")
+ self.proc = subprocess.Popen(self.command, env=self.env, stdout=subprocess.PIPE)
+
+ self.reader_thread = threading.Thread(
+ target=readOutputLine,
+ args=(self.proc.stdout, self.processOutputLine),
+ daemon=True,
+ )
+ self.reader_thread.start()
+ # Wait for the port to become ready
+ end_time = time.time() + 10
+ while time.time() < end_time:
+ returncode = self.proc.poll()
+ if returncode is not None:
+ raise ChildProcessError(
+ f"geckodriver terminated with code {returncode}"
+ )
+ if self.port is not None:
+ with socket.socket() as sock:
+ if sock.connect_ex((self.hostname, self.port)) == 0:
+ break
+ else:
+ time.sleep(0.1)
+ else:
+ if self.port is None:
+ raise OSError(
+ f"Failed to read geckodriver port started on {self.hostname}"
+ )
+ raise ConnectionRefusedError(
+ f"Failed to connect to geckodriver on {self.hostname}:{self.port}"
+ )
+
+ self.session = webdriver.Session(
+ self.hostname, self.port, capabilities=self.capabilities
+ )
+
+ return self
+
+ def processOutputLine(self, line):
+ if self.port is None:
+ m = self.PORT_RE.match(line)
+ if m is not None:
+ self.port = int(m.groups()[0])
+
+ def stop(self):
+ if self.session is not None:
+ self.delete_session()
+ if self.proc:
+ self.proc.kill()
+ self.port = None
+ if self.reader_thread is not None:
+ self.reader_thread.join()
+
+ def new_session(self):
+ self.session.start()
+
+ def delete_session(self):
+ self.session.end()
+
+
+def clear_pref(session, pref):
+ """Clear the user-defined value from the specified preference.
+
+ :param pref: Name of the preference.
+ """
+ with using_context(session, "chrome"):
+ session.execute_script(
+ """
+ const { Preferences } = ChromeUtils.importESModule(
+ "resource://gre/modules/Preferences.sys.mjs"
+ );
+ Preferences.reset(arguments[0]);
+ """,
+ args=(pref,),
+ )
+
+
+def create_custom_profile(base_profile, default_preferences, clone=True):
+ if clone:
+ # Clone the current profile and remove the prefs.js file to only
+ # keep default preferences as set in user.js.
+ profile = Profile.clone(base_profile)
+ prefs_path = os.path.join(profile.profile, "prefs.js")
+ if os.path.exists(prefs_path):
+ os.remove(prefs_path)
+ else:
+ profile = Profile(tempfile.mkdtemp(prefix="wdspec-"))
+ profile.set_preferences(default_preferences)
+
+ return profile
+
+
+def get_arg_value(arg_names, args):
+ """Get an argument value from a list of arguments
+
+ This assumes that argparse argument parsing is close enough to the target
+ to be compatible, at least with the set of inputs we have.
+
+ :param arg_names: - List of names for the argument e.g. ["--foo", "-f"]
+ :param args: - List of arguments to parse
+ :returns: - Optional string argument value
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(*arg_names, action="store", dest="value", default=None)
+ parsed, _ = parser.parse_known_args(args)
+ return parsed.value
+
+
+def get_pref(session, pref):
+ """Get the value of the specified preference.
+
+ :param pref: Name of the preference.
+ """
+ with using_context(session, "chrome"):
+ pref_value = session.execute_script(
+ """
+ const { Preferences } = ChromeUtils.importESModule(
+ "resource://gre/modules/Preferences.sys.mjs"
+ );
+
+ let pref = arguments[0];
+
+ prefs = new Preferences();
+ return prefs.get(pref, null);
+ """,
+ args=(pref,),
+ )
+ return pref_value
+
+
+def get_profile_folder(firefox_options):
+ return get_arg_value(["--profile"], firefox_options["args"])
+
+
+def readOutputLine(stream, callback):
+ while True:
+ line = stream.readline()
+ if not line:
+ break
+
+ callback(line)
+
+
+def read_user_preferences(profile_path, filename="user.js"):
+ prefs_file = os.path.join(profile_path, filename)
+
+ prefs = {}
+ for pref_name, pref_value in Preferences().read_prefs(prefs_file):
+ prefs[pref_name] = pref_value
+
+ return prefs
+
+
+def set_pref(session, pref, value):
+ """Set the value of the specified preference.
+
+ :param pref: Name of the preference.
+ :param value: The value to set the preference to. If the value is None,
+ reset the preference to its default value. If no default
+ value exists, the preference will cease to exist.
+ """
+ if value is None:
+ clear_pref(session, pref)
+ return
+
+ with using_context(session, "chrome"):
+ session.execute_script(
+ """
+ const { Preferences } = ChromeUtils.importESModule(
+ "resource://gre/modules/Preferences.sys.mjs"
+ );
+
+ const [pref, value] = arguments;
+
+ prefs = new Preferences();
+ prefs.set(pref, value);
+ """,
+ args=(pref, value),
+ )