diff options
Diffstat (limited to 'testing/web-platform/mozilla/tests/webdriver')
13 files changed, 584 insertions, 366 deletions
diff --git a/testing/web-platform/mozilla/tests/webdriver/bidi/browsing_context/navigate/error.py b/testing/web-platform/mozilla/tests/webdriver/bidi/browsing_context/navigate/error.py index 374359d1ae..ea76b13727 100644 --- a/testing/web-platform/mozilla/tests/webdriver/bidi/browsing_context/navigate/error.py +++ b/testing/web-platform/mozilla/tests/webdriver/bidi/browsing_context/navigate/error.py @@ -1,4 +1,3 @@ -import os from copy import deepcopy import pytest @@ -7,13 +6,12 @@ from tests.bidi.browsing_context.navigate import navigate_and_assert pytestmark = pytest.mark.asyncio -async def test_insecure_certificate(configuration, url, custom_profile, geckodriver): - try: - # Create a new profile and remove the certificate storage so that - # loading a HTTPS page will cause an insecure certificate error - os.remove(os.path.join(custom_profile.profile, "cert9.db")) - except Exception: - pass +async def test_insecure_certificate( + configuration, url, create_custom_profile, geckodriver +): + # Create a fresh profile without any item in the certificate storage so that + # loading a HTTPS page will cause an insecure certificate error + custom_profile = create_custom_profile(clone=False) config = deepcopy(configuration) config["capabilities"]["moz:firefoxOptions"]["args"] = [ diff --git a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/__init__.py b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/__init__.py diff --git a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/partition.py b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/partition.py new file mode 100644 index 0000000000..d8e2729fe2 --- /dev/null +++ b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/delete_cookies/partition.py @@ -0,0 +1,114 @@ +import pytest +from tests.bidi import recursive_compare +from tests.support.helpers import get_origin_from_url +from webdriver.bidi.modules.network import NetworkStringValue +from webdriver.bidi.modules.storage import ( + BrowsingContextPartitionDescriptor, + PartialCookie, +) + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.parametrize( + "with_document_cookie", + [True, False], + ids=["with document cookie", "with set cookie"], +) +async def test_partition_context( + bidi_session, + new_tab, + test_page, + domain_value, + add_cookie, + set_cookie, + with_document_cookie, +): + await bidi_session.browsing_context.navigate( + context=new_tab["context"], url=test_page, wait="complete" + ) + + cookie_name = "foo" + cookie_value = "bar" + source_origin = get_origin_from_url(test_page) + partition = BrowsingContextPartitionDescriptor(new_tab["context"]) + if with_document_cookie: + await add_cookie(new_tab["context"], cookie_name, cookie_value) + else: + await set_cookie( + cookie=PartialCookie( + domain=domain_value(), + name=cookie_name, + value=NetworkStringValue(cookie_value), + ), + partition=partition, + ) + + result = await bidi_session.storage.delete_cookies(partition=partition) + recursive_compare({"partitionKey": {"sourceOrigin": source_origin}}, result) + + result = await bidi_session.storage.get_cookies(partition=partition) + assert result["cookies"] == [] + + +@pytest.mark.parametrize("domain", ["", "alt"], ids=["same_origin", "cross_origin"]) +async def test_partition_context_iframe_with_set_cookie( + bidi_session, new_tab, inline, domain_value, domain, set_cookie +): + iframe_url = inline("<div id='in-iframe'>foo</div>", domain=domain) + page_url = inline(f"<iframe src='{iframe_url}'></iframe>") + await bidi_session.browsing_context.navigate( + context=new_tab["context"], url=page_url, wait="complete" + ) + source_origin = get_origin_from_url(iframe_url) + + contexts = await bidi_session.browsing_context.get_tree(root=new_tab["context"]) + iframe_context = contexts[0]["children"][0] + + cookie_name = "foo" + cookie_value = "bar" + frame_partition = BrowsingContextPartitionDescriptor(iframe_context["context"]) + await set_cookie( + cookie=PartialCookie( + domain=domain_value(domain), + name=cookie_name, + value=NetworkStringValue(cookie_value), + ), + partition=frame_partition, + ) + + result = await bidi_session.storage.delete_cookies(partition=frame_partition) + recursive_compare({"partitionKey": {"sourceOrigin": source_origin}}, result) + + result = await bidi_session.storage.get_cookies(partition=frame_partition) + assert result["cookies"] == [] + + +# Because of Dynamic First-Party Isolation, adding the cookie with `document.cookie` +# works only with same-origin iframes. +async def test_partition_context_same_origin_iframe_with_document_cookie( + bidi_session, + new_tab, + inline, + add_cookie, +): + iframe_url = inline("<div id='in-iframe'>foo</div>") + page_url = inline(f"<iframe src='{iframe_url}'></iframe>") + await bidi_session.browsing_context.navigate( + context=new_tab["context"], url=page_url, wait="complete" + ) + source_origin = get_origin_from_url(iframe_url) + + contexts = await bidi_session.browsing_context.get_tree(root=new_tab["context"]) + iframe_context = contexts[0]["children"][0] + + cookie_name = "foo" + cookie_value = "bar" + frame_partition = BrowsingContextPartitionDescriptor(iframe_context["context"]) + await add_cookie(iframe_context["context"], cookie_name, cookie_value) + + result = await bidi_session.storage.delete_cookies(partition=frame_partition) + recursive_compare({"partitionKey": {"sourceOrigin": source_origin}}, result) + + result = await bidi_session.storage.get_cookies(partition=frame_partition) + assert result["cookies"] == [] diff --git a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/get_cookies/partition.py b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/get_cookies/partition.py index b037c30038..5503f13224 100644 --- a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/get_cookies/partition.py +++ b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/get_cookies/partition.py @@ -59,18 +59,22 @@ async def test_partition_context( ) recursive_compare( - {"cookies": [], "partitionKey": {"sourceOrigin": source_origin_2}}, cookies + { + "cookies": [], + "partitionKey": {"sourceOrigin": source_origin_2, "userContext": "default"}, + }, + cookies, ) -@pytest.mark.parametrize("domain", ["", "alt"], ids=["same_origin", "cross_origin"]) -async def test_partition_context_iframe( - bidi_session, new_tab, inline, domain_value, domain, add_cookie +# Because of Dynamic First-Party Isolation, adding the cookie with `document.cookie` +# works only with same-origin iframes. +async def test_partition_context_same_origin_iframe( + bidi_session, new_tab, inline, domain_value, add_cookie ): - iframe_url = inline("<div id='in-iframe'>foo</div>", domain=domain) - source_origin_for_iframe = get_origin_from_url(iframe_url) + iframe_url = inline("<div id='in-iframe'>foo</div>") + source_origin = get_origin_from_url(iframe_url) page_url = inline(f"<iframe src='{iframe_url}'></iframe>") - source_origin_for_page = get_origin_from_url(page_url) await bidi_session.browsing_context.navigate( context=new_tab["context"], url=page_url, wait="complete" ) @@ -89,7 +93,7 @@ async def test_partition_context_iframe( expected_cookies = [ { - "domain": domain_value(domain=domain), + "domain": domain_value(), "httpOnly": False, "name": cookie_name, "path": "/webdriver/tests/support", @@ -99,10 +103,11 @@ async def test_partition_context_iframe( "value": {"type": "string", "value": cookie_value}, } ] + recursive_compare( { "cookies": expected_cookies, - "partitionKey": {"sourceOrigin": source_origin_for_iframe}, + "partitionKey": {"sourceOrigin": source_origin}, }, cookies, ) @@ -110,22 +115,16 @@ async def test_partition_context_iframe( cookies = await bidi_session.storage.get_cookies( partition=BrowsingContextPartitionDescriptor(new_tab["context"]) ) - # When the iframe is on the different domain we can verify that top context has no iframe cookie. - if domain == "alt": - recursive_compare( - { - "cookies": [], - "partitionKey": {"sourceOrigin": source_origin_for_page}, - }, - cookies, - ) - else: - # When the iframe is on the same domain, since the browsing context partition is defined by user context and origin, - # which will be the same for the page, we get the same cookies as for the iframe - recursive_compare( - { - "cookies": expected_cookies, - "partitionKey": {"sourceOrigin": source_origin_for_page}, + + # When the iframe is on the same domain, since the browsing context partition is defined by user context and origin, + # which will be the same for the page, we get the same cookies as for the iframe. + recursive_compare( + { + "cookies": expected_cookies, + "partitionKey": { + "sourceOrigin": source_origin, + "userContext": "default", }, - cookies, - ) + }, + cookies, + ) diff --git a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/set_cookie/partition.py b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/set_cookie/partition.py index f8e2823dbc..a9b5d3a43a 100644 --- a/testing/web-platform/mozilla/tests/webdriver/bidi/storage/set_cookie/partition.py +++ b/testing/web-platform/mozilla/tests/webdriver/bidi/storage/set_cookie/partition.py @@ -42,7 +42,9 @@ async def test_partition_context( partition=new_tab_partition, ) - assert set_cookie_result == {"partitionKey": {"sourceOrigin": source_origin_1}} + assert set_cookie_result == { + "partitionKey": {"sourceOrigin": source_origin_1, "userContext": "default"} + } # Check that added cookies are present on the right context. cookies = await bidi_session.storage.get_cookies(partition=new_tab_partition) @@ -72,7 +74,11 @@ async def test_partition_context( ) recursive_compare( - {"cookies": [], "partitionKey": {"sourceOrigin": source_origin_2}}, cookies + { + "cookies": [], + "partitionKey": {"sourceOrigin": source_origin_2, "userContext": "default"}, + }, + cookies, ) @@ -121,7 +127,10 @@ async def test_partition_context_iframe( recursive_compare( { "cookies": expected_cookies, - "partitionKey": {"sourceOrigin": source_origin_for_iframe}, + "partitionKey": { + "sourceOrigin": source_origin_for_iframe, + "userContext": "default", + }, }, cookies, ) @@ -134,7 +143,10 @@ async def test_partition_context_iframe( recursive_compare( { "cookies": [], - "partitionKey": {"sourceOrigin": source_origin_for_page}, + "partitionKey": { + "sourceOrigin": source_origin_for_page, + "userContext": "default", + }, }, cookies, ) @@ -144,7 +156,10 @@ async def test_partition_context_iframe( recursive_compare( { "cookies": expected_cookies, - "partitionKey": {"sourceOrigin": source_origin_for_page}, + "partitionKey": { + "sourceOrigin": source_origin_for_page, + "userContext": "default", + }, }, cookies, ) diff --git a/testing/web-platform/mozilla/tests/webdriver/cdp/port_file.py b/testing/web-platform/mozilla/tests/webdriver/cdp/port_file.py index aa294deb24..23c31906fa 100644 --- a/testing/web-platform/mozilla/tests/webdriver/cdp/port_file.py +++ b/testing/web-platform/mozilla/tests/webdriver/cdp/port_file.py @@ -4,7 +4,7 @@ from support.network import websocket_request def test_devtools_active_port_file(browser): - current_browser = browser(use_cdp=True) + current_browser = browser(use_cdp=True, clone_profile=False) assert current_browser.remote_agent_port != 0 assert current_browser.debugger_address.startswith("/devtools/browser/") @@ -12,9 +12,6 @@ def test_devtools_active_port_file(browser): port_file = os.path.join(current_browser.profile.profile, "DevToolsActivePort") assert os.path.exists(port_file) - current_browser.quit(clean_profile=False) - assert not os.path.exists(port_file) - def test_connect(browser): current_browser = browser(use_cdp=True) diff --git a/testing/web-platform/mozilla/tests/webdriver/classic/new_session/conftest.py b/testing/web-platform/mozilla/tests/webdriver/classic/new_session/conftest.py index 1cab6784c2..63abd19f6a 100644 --- a/testing/web-platform/mozilla/tests/webdriver/classic/new_session/conftest.py +++ b/testing/web-platform/mozilla/tests/webdriver/classic/new_session/conftest.py @@ -2,7 +2,7 @@ import pytest from webdriver.transport import HTTPWireProtocol -@pytest.fixture(name="configuration") +@pytest.fixture(name="configuration", scope="session") def fixture_configuration(configuration): """Remove "acceptInsecureCerts" from capabilities if it exists. diff --git a/testing/web-platform/mozilla/tests/webdriver/classic/new_session/profile_root.py b/testing/web-platform/mozilla/tests/webdriver/classic/new_session/profile_root.py index fc3607bed9..97cb835e2c 100644 --- a/testing/web-platform/mozilla/tests/webdriver/classic/new_session/profile_root.py +++ b/testing/web-platform/mozilla/tests/webdriver/classic/new_session/profile_root.py @@ -4,7 +4,7 @@ import os import pytest -def test_profile_root(tmp_path, configuration, geckodriver, user_prefs): +def test_profile_root(tmp_path, configuration, geckodriver, default_preferences): profile_path = os.path.join(tmp_path, "geckodriver-test") os.makedirs(profile_path) @@ -12,7 +12,7 @@ def test_profile_root(tmp_path, configuration, geckodriver, user_prefs): # Pass all the wpt preferences from the default profile's user.js via # capabilities to allow geckodriver to create a new valid profile itself. - config["capabilities"]["moz:firefoxOptions"]["prefs"] = user_prefs + config["capabilities"]["moz:firefoxOptions"]["prefs"] = default_preferences # Ensure we don't set a profile in command line arguments del config["capabilities"]["moz:firefoxOptions"]["args"] diff --git a/testing/web-platform/mozilla/tests/webdriver/classic/protocol/marionette_port.py b/testing/web-platform/mozilla/tests/webdriver/classic/protocol/marionette_port.py index 09951abc43..20b4e03324 100644 --- a/testing/web-platform/mozilla/tests/webdriver/classic/protocol/marionette_port.py +++ b/testing/web-platform/mozilla/tests/webdriver/classic/protocol/marionette_port.py @@ -14,9 +14,10 @@ def test_marionette_port(geckodriver, port): def test_marionette_port_outdated_active_port_file( - configuration, geckodriver, custom_profile + configuration, create_custom_profile, geckodriver ): config = deepcopy(configuration) + custom_profile = create_custom_profile() extra_args = ["--marionette-port", "0"] # Prepare a Marionette active port file that contains a port which will diff --git a/testing/web-platform/mozilla/tests/webdriver/harness/preferences.py b/testing/web-platform/mozilla/tests/webdriver/harness/preferences_marionette.py index b5cf36bd5e..e5d18aeb6b 100644 --- a/testing/web-platform/mozilla/tests/webdriver/harness/preferences.py +++ b/testing/web-platform/mozilla/tests/webdriver/harness/preferences_marionette.py @@ -2,5 +2,5 @@ from support.fixtures import get_pref def test_recommended_preferences(session): - has_recommended_prefs = get_pref(session, "remote.prefs.recommended") + has_recommended_prefs = get_pref(session, "remote.prefs.recommended.applied") assert has_recommended_prefs is True diff --git a/testing/web-platform/mozilla/tests/webdriver/harness/preferences_remote_agent.py b/testing/web-platform/mozilla/tests/webdriver/harness/preferences_remote_agent.py new file mode 100644 index 0000000000..59db5fa0e3 --- /dev/null +++ b/testing/web-platform/mozilla/tests/webdriver/harness/preferences_remote_agent.py @@ -0,0 +1,37 @@ +import pytest +from support.helpers import read_user_preferences +from tests.support.sync import Poll + + +@pytest.mark.parametrize( + "value", + [ + {"pref_value": 1, "use_cdp": False, "use_bidi": True}, + {"pref_value": 2, "use_cdp": True, "use_bidi": False}, + {"pref_value": 3, "use_cdp": True, "use_bidi": True}, + ], + ids=["bidi only", "cdp only", "bidi and cdp"], +) +def test_remote_agent_recommended_preferences_applied(browser, value): + # Marionette cannot be enabled for this test because it will also set the + # recommended preferences. Therefore only enable Remote Agent protocols. + current_browser = browser( + extra_prefs={ + "remote.active-protocols": value["pref_value"], + }, + use_cdp=value["use_cdp"], + use_bidi=value["use_bidi"], + ) + + def pref_is_set(_): + preferences = read_user_preferences(current_browser.profile.profile, "prefs.js") + return preferences.get("remote.prefs.recommended.applied", False) + + # Without Marionette enabled preferences cannot be retrieved via script evaluation yet. + wait = Poll( + None, + timeout=5, + ignored_exceptions=FileNotFoundError, + message="""Preference "remote.prefs.recommended.applied" is not true""", + ) + wait.until(pref_is_set) diff --git a/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py b/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py index e9dbf1cdfe..b788d874e5 100644 --- a/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py +++ b/testing/web-platform/mozilla/tests/webdriver/support/fixtures.py @@ -1,40 +1,18 @@ -import argparse -import json -import os -import re -import socket -import subprocess -import threading -import time -from contextlib import suppress -from urllib.parse import urlparse - import pytest -import webdriver -from mozprofile import Preferences, Profile -from mozrunner import FirefoxRunner - -from .context import using_context - - -def get_arg_value(arg_names, args): - """Get an argument value from a list of arguments - This assumes that argparse argument parsing is close enough to the target - to be compatible, at least with the set of inputs we have. - - :param arg_names: - List of names for the argument e.g. ["--foo", "-f"] - :param args: - List of arguments to parse - :returns: - Optional string argument value - """ - parser = argparse.ArgumentParser() - parser.add_argument(*arg_names, action="store", dest="value", default=None) - parsed, _ = parser.parse_known_args(args) - return parsed.value +from .helpers import ( + Browser, + Geckodriver, + create_custom_profile, + get_pref, + get_profile_folder, + read_user_preferences, + set_pref, +) @pytest.fixture(scope="module") -def browser(full_configuration): +def browser(configuration, firefox_options): """Start a Firefox instance without using geckodriver. geckodriver will automatically use the --remote-allow-hosts and @@ -46,7 +24,13 @@ def browser(full_configuration): """ current_browser = None - def _browser(use_bidi=False, use_cdp=False, extra_args=None, extra_prefs=None): + def _browser( + use_bidi=False, + use_cdp=False, + extra_args=None, + extra_prefs=None, + clone_profile=True, + ): nonlocal current_browser # If the requested preferences and arguments match the ones for the @@ -66,12 +50,18 @@ def browser(full_configuration): # to create a new instance for the provided preferences. current_browser.quit() - binary = full_configuration["browser"]["binary"] - env = full_configuration["browser"]["env"] - firefox_options = full_configuration["capabilities"]["moz:firefoxOptions"] + binary = configuration["browser"]["binary"] + env = configuration["browser"]["env"] + + profile_path = get_profile_folder(firefox_options) + default_prefs = read_user_preferences(profile_path) + profile = create_custom_profile( + profile_path, default_prefs, clone=clone_profile + ) + current_browser = Browser( binary, - firefox_options, + profile, use_bidi=use_bidi, use_cdp=use_cdp, extra_args=extra_args, @@ -89,20 +79,32 @@ def browser(full_configuration): current_browser = None -@pytest.fixture -def profile_folder(configuration): - firefox_options = configuration["capabilities"]["moz:firefoxOptions"] - return get_arg_value(["--profile"], firefox_options["args"]) +@pytest.fixture(name="create_custom_profile") +def fixture_create_custom_profile(default_preferences, profile_folder): + profile = None + + def _create_custom_profile(clone=True): + profile = create_custom_profile( + profile_folder, default_preferences, clone=clone + ) + + return profile + + yield _create_custom_profile + + # if profile is not None: + if profile: + profile.cleanup() @pytest.fixture -def custom_profile(profile_folder): - # Clone the known profile for automation preferences - profile = Profile.clone(profile_folder) +def default_preferences(profile_folder): + return read_user_preferences(profile_folder) - yield profile - profile.cleanup() +@pytest.fixture(scope="session") +def firefox_options(configuration): + return configuration["capabilities"]["moz:firefoxOptions"] @pytest.fixture @@ -128,277 +130,8 @@ def geckodriver(configuration): @pytest.fixture -def user_prefs(profile_folder): - user_js = os.path.join(profile_folder, "user.js") - - prefs = {} - for pref_name, pref_value in Preferences().read_prefs(user_js): - prefs[pref_name] = pref_value - - return prefs - - -class Browser: - def __init__( - self, - binary, - firefox_options, - use_bidi=False, - use_cdp=False, - extra_args=None, - extra_prefs=None, - env=None, - ): - self.use_bidi = use_bidi - self.bidi_port_file = None - self.use_cdp = use_cdp - self.cdp_port_file = None - self.extra_args = extra_args - self.extra_prefs = extra_prefs - - self.debugger_address = None - self.remote_agent_host = None - self.remote_agent_port = None - - # Prepare temporary profile - _profile_arg, profile_folder = firefox_options["args"] - self.profile = Profile.clone(profile_folder) - if self.extra_prefs is not None: - self.profile.set_preferences(self.extra_prefs) - - if use_cdp: - self.cdp_port_file = os.path.join( - self.profile.profile, "DevToolsActivePort" - ) - with suppress(FileNotFoundError): - os.remove(self.cdp_port_file) - if use_bidi: - self.webdriver_bidi_file = os.path.join( - self.profile.profile, "WebDriverBiDiServer.json" - ) - with suppress(FileNotFoundError): - os.remove(self.webdriver_bidi_file) - - cmdargs = ["-no-remote"] - if self.use_bidi or self.use_cdp: - cmdargs.extend(["--remote-debugging-port", "0"]) - if self.extra_args is not None: - cmdargs.extend(self.extra_args) - self.runner = FirefoxRunner( - binary=binary, profile=self.profile, cmdargs=cmdargs, env=env - ) - - @property - def is_running(self): - return self.runner.is_running() - - def start(self): - # Start Firefox. - self.runner.start() - - if self.use_bidi: - # Wait until the WebDriverBiDiServer.json file is ready - while not os.path.exists(self.webdriver_bidi_file): - time.sleep(0.1) - - # Read the connection details from file - data = json.loads(open(self.webdriver_bidi_file).read()) - self.remote_agent_host = data["ws_host"] - self.remote_agent_port = int(data["ws_port"]) - - if self.use_cdp: - # Wait until the DevToolsActivePort file is ready - while not os.path.exists(self.cdp_port_file): - time.sleep(0.1) - - # Read the port if needed and the debugger address from the - # DevToolsActivePort file - lines = open(self.cdp_port_file).readlines() - assert len(lines) == 2 - - if self.remote_agent_port is None: - self.remote_agent_port = int(lines[0].strip()) - self.debugger_address = lines[1].strip() - - def quit(self, clean_profile=True): - if self.is_running: - self.runner.stop() - self.runner.cleanup() - - if clean_profile: - self.profile.cleanup() - - def wait(self): - if self.is_running is True: - self.runner.wait() - - -class Geckodriver: - PORT_RE = re.compile(b".*Listening on [^ :]*:(\d+)") - - def __init__(self, configuration, hostname=None, extra_args=None): - self.config = configuration["webdriver"] - self.requested_capabilities = configuration["capabilities"] - self.hostname = hostname or configuration["host"] - self.extra_args = extra_args or [] - self.env = configuration["browser"]["env"] - - self.command = None - self.proc = None - self.port = None - self.reader_thread = None - - self.capabilities = {"alwaysMatch": self.requested_capabilities} - self.session = None - - @property - def remote_agent_port(self): - webSocketUrl = self.session.capabilities.get("webSocketUrl") - assert webSocketUrl is not None - - return urlparse(webSocketUrl).port - - def start(self): - self.command = ( - [self.config["binary"], "--port", "0"] - + self.config["args"] - + self.extra_args - ) - - print(f"Running command: {' '.join(self.command)}") - self.proc = subprocess.Popen(self.command, env=self.env, stdout=subprocess.PIPE) - - self.reader_thread = threading.Thread( - target=readOutputLine, - args=(self.proc.stdout, self.processOutputLine), - daemon=True, - ) - self.reader_thread.start() - # Wait for the port to become ready - end_time = time.time() + 10 - while time.time() < end_time: - returncode = self.proc.poll() - if returncode is not None: - raise ChildProcessError( - f"geckodriver terminated with code {returncode}" - ) - if self.port is not None: - with socket.socket() as sock: - if sock.connect_ex((self.hostname, self.port)) == 0: - break - else: - time.sleep(0.1) - else: - if self.port is None: - raise OSError( - f"Failed to read geckodriver port started on {self.hostname}" - ) - raise ConnectionRefusedError( - f"Failed to connect to geckodriver on {self.hostname}:{self.port}" - ) - - self.session = webdriver.Session( - self.hostname, self.port, capabilities=self.capabilities - ) - - return self - - def processOutputLine(self, line): - if self.port is None: - m = self.PORT_RE.match(line) - if m is not None: - self.port = int(m.groups()[0]) - - def stop(self): - if self.session is not None: - self.delete_session() - if self.proc: - self.proc.kill() - self.port = None - if self.reader_thread is not None: - self.reader_thread.join() - - def new_session(self): - self.session.start() - - def delete_session(self): - self.session.end() - - -def readOutputLine(stream, callback): - while True: - line = stream.readline() - if not line: - break - - callback(line) - - -def clear_pref(session, pref): - """Clear the user-defined value from the specified preference. - - :param pref: Name of the preference. - """ - with using_context(session, "chrome"): - session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - Preferences.reset(arguments[0]); - """, - args=(pref,), - ) - - -def get_pref(session, pref): - """Get the value of the specified preference. - - :param pref: Name of the preference. - """ - with using_context(session, "chrome"): - pref_value = session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - - let pref = arguments[0]; - - prefs = new Preferences(); - return prefs.get(pref, null); - """, - args=(pref,), - ) - return pref_value - - -def set_pref(session, pref, value): - """Set the value of the specified preference. - - :param pref: Name of the preference. - :param value: The value to set the preference to. If the value is None, - reset the preference to its default value. If no default - value exists, the preference will cease to exist. - """ - if value is None: - clear_pref(session, pref) - return - - with using_context(session, "chrome"): - session.execute_script( - """ - const { Preferences } = ChromeUtils.importESModule( - "resource://gre/modules/Preferences.sys.mjs" - ); - - const [pref, value] = arguments; - - prefs = new Preferences(); - prefs.set(pref, value); - """, - args=(pref, value), - ) +def profile_folder(firefox_options): + return get_profile_folder(firefox_options) @pytest.fixture diff --git a/testing/web-platform/mozilla/tests/webdriver/support/helpers.py b/testing/web-platform/mozilla/tests/webdriver/support/helpers.py new file mode 100644 index 0000000000..6577289983 --- /dev/null +++ b/testing/web-platform/mozilla/tests/webdriver/support/helpers.py @@ -0,0 +1,324 @@ +import argparse +import json +import os +import re +import socket +import subprocess +import tempfile +import threading +import time +from contextlib import suppress +from urllib.parse import urlparse + +import webdriver +from mozprofile import Preferences, Profile +from mozrunner import FirefoxRunner + +from .context import using_context + + +class Browser: + def __init__( + self, + binary, + profile, + use_bidi=False, + use_cdp=False, + extra_args=None, + extra_prefs=None, + env=None, + ): + self.profile = profile + self.use_bidi = use_bidi + self.bidi_port_file = None + self.use_cdp = use_cdp + self.cdp_port_file = None + self.extra_args = extra_args + self.extra_prefs = extra_prefs + + self.debugger_address = None + self.remote_agent_host = None + self.remote_agent_port = None + + if self.extra_prefs is not None: + self.profile.set_preferences(self.extra_prefs) + + if use_cdp: + self.cdp_port_file = os.path.join( + self.profile.profile, "DevToolsActivePort" + ) + with suppress(FileNotFoundError): + os.remove(self.cdp_port_file) + + if use_bidi: + self.webdriver_bidi_file = os.path.join( + self.profile.profile, "WebDriverBiDiServer.json" + ) + with suppress(FileNotFoundError): + os.remove(self.webdriver_bidi_file) + + cmdargs = ["-no-remote"] + if self.use_bidi or self.use_cdp: + cmdargs.extend(["--remote-debugging-port", "0"]) + if self.extra_args is not None: + cmdargs.extend(self.extra_args) + self.runner = FirefoxRunner( + binary=binary, profile=self.profile, cmdargs=cmdargs, env=env + ) + + @property + def is_running(self): + return self.runner.is_running() + + def start(self): + # Start Firefox. + self.runner.start() + + if self.use_bidi: + # Wait until the WebDriverBiDiServer.json file is ready + while not os.path.exists(self.webdriver_bidi_file): + time.sleep(0.1) + + # Read the connection details from file + data = json.loads(open(self.webdriver_bidi_file).read()) + self.remote_agent_host = data["ws_host"] + self.remote_agent_port = int(data["ws_port"]) + + if self.use_cdp: + # Wait until the DevToolsActivePort file is ready + while not os.path.exists(self.cdp_port_file): + time.sleep(0.1) + + # Read the port if needed and the debugger address from the + # DevToolsActivePort file + lines = open(self.cdp_port_file).readlines() + assert len(lines) == 2 + + if self.remote_agent_port is None: + self.remote_agent_port = int(lines[0].strip()) + self.debugger_address = lines[1].strip() + + def quit(self, clean_profile=True): + if self.is_running: + self.runner.stop() + self.runner.cleanup() + + if clean_profile: + self.profile.cleanup() + + def wait(self): + if self.is_running is True: + self.runner.wait() + + +class Geckodriver: + PORT_RE = re.compile(rb".*Listening on [^ :]*:(\d+)") + + def __init__(self, configuration, hostname=None, extra_args=None): + self.config = configuration["webdriver"] + self.requested_capabilities = configuration["capabilities"] + self.hostname = hostname or configuration["host"] + self.extra_args = extra_args or [] + self.env = configuration["browser"]["env"] + + self.command = None + self.proc = None + self.port = None + self.reader_thread = None + + self.capabilities = {"alwaysMatch": self.requested_capabilities} + self.session = None + + @property + def remote_agent_port(self): + webSocketUrl = self.session.capabilities.get("webSocketUrl") + assert webSocketUrl is not None + + return urlparse(webSocketUrl).port + + def start(self): + self.command = ( + [self.config["binary"], "--port", "0"] + + self.config["args"] + + self.extra_args + ) + + print(f"Running command: {' '.join(self.command)}") + self.proc = subprocess.Popen(self.command, env=self.env, stdout=subprocess.PIPE) + + self.reader_thread = threading.Thread( + target=readOutputLine, + args=(self.proc.stdout, self.processOutputLine), + daemon=True, + ) + self.reader_thread.start() + # Wait for the port to become ready + end_time = time.time() + 10 + while time.time() < end_time: + returncode = self.proc.poll() + if returncode is not None: + raise ChildProcessError( + f"geckodriver terminated with code {returncode}" + ) + if self.port is not None: + with socket.socket() as sock: + if sock.connect_ex((self.hostname, self.port)) == 0: + break + else: + time.sleep(0.1) + else: + if self.port is None: + raise OSError( + f"Failed to read geckodriver port started on {self.hostname}" + ) + raise ConnectionRefusedError( + f"Failed to connect to geckodriver on {self.hostname}:{self.port}" + ) + + self.session = webdriver.Session( + self.hostname, self.port, capabilities=self.capabilities + ) + + return self + + def processOutputLine(self, line): + if self.port is None: + m = self.PORT_RE.match(line) + if m is not None: + self.port = int(m.groups()[0]) + + def stop(self): + if self.session is not None: + self.delete_session() + if self.proc: + self.proc.kill() + self.port = None + if self.reader_thread is not None: + self.reader_thread.join() + + def new_session(self): + self.session.start() + + def delete_session(self): + self.session.end() + + +def clear_pref(session, pref): + """Clear the user-defined value from the specified preference. + + :param pref: Name of the preference. + """ + with using_context(session, "chrome"): + session.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + Preferences.reset(arguments[0]); + """, + args=(pref,), + ) + + +def create_custom_profile(base_profile, default_preferences, clone=True): + if clone: + # Clone the current profile and remove the prefs.js file to only + # keep default preferences as set in user.js. + profile = Profile.clone(base_profile) + prefs_path = os.path.join(profile.profile, "prefs.js") + if os.path.exists(prefs_path): + os.remove(prefs_path) + else: + profile = Profile(tempfile.mkdtemp(prefix="wdspec-")) + profile.set_preferences(default_preferences) + + return profile + + +def get_arg_value(arg_names, args): + """Get an argument value from a list of arguments + + This assumes that argparse argument parsing is close enough to the target + to be compatible, at least with the set of inputs we have. + + :param arg_names: - List of names for the argument e.g. ["--foo", "-f"] + :param args: - List of arguments to parse + :returns: - Optional string argument value + """ + parser = argparse.ArgumentParser() + parser.add_argument(*arg_names, action="store", dest="value", default=None) + parsed, _ = parser.parse_known_args(args) + return parsed.value + + +def get_pref(session, pref): + """Get the value of the specified preference. + + :param pref: Name of the preference. + """ + with using_context(session, "chrome"): + pref_value = session.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + + let pref = arguments[0]; + + prefs = new Preferences(); + return prefs.get(pref, null); + """, + args=(pref,), + ) + return pref_value + + +def get_profile_folder(firefox_options): + return get_arg_value(["--profile"], firefox_options["args"]) + + +def readOutputLine(stream, callback): + while True: + line = stream.readline() + if not line: + break + + callback(line) + + +def read_user_preferences(profile_path, filename="user.js"): + prefs_file = os.path.join(profile_path, filename) + + prefs = {} + for pref_name, pref_value in Preferences().read_prefs(prefs_file): + prefs[pref_name] = pref_value + + return prefs + + +def set_pref(session, pref, value): + """Set the value of the specified preference. + + :param pref: Name of the preference. + :param value: The value to set the preference to. If the value is None, + reset the preference to its default value. If no default + value exists, the preference will cease to exist. + """ + if value is None: + clear_pref(session, pref) + return + + with using_context(session, "chrome"): + session.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + + const [pref, value] = arguments; + + prefs = new Preferences(); + prefs.set(pref, value); + """, + args=(pref, value), + ) |