diff options
Diffstat (limited to 'testing/web-platform/tests/tools/wptrunner/wptrunner/executors/protocol.py')
-rw-r--r-- | testing/web-platform/tests/tools/wptrunner/wptrunner/executors/protocol.py | 804 |
1 files changed, 804 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/protocol.py b/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/protocol.py new file mode 100644 index 0000000000..e44d1a7666 --- /dev/null +++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/executors/protocol.py @@ -0,0 +1,804 @@ +# mypy: allow-untyped-defs + +import traceback +from http.client import HTTPConnection + +from abc import ABCMeta, abstractmethod +from typing import ClassVar, List, Type + + +def merge_dicts(target, source): + if not (isinstance(target, dict) and isinstance(source, dict)): + raise TypeError + for (key, source_value) in source.items(): + if key not in target: + target[key] = source_value + else: + if isinstance(source_value, dict) and isinstance(target[key], dict): + merge_dicts(target[key], source_value) + else: + target[key] = source_value + +class Protocol: + """Backend for a specific browser-control protocol. + + Each Protocol is composed of a set of ProtocolParts that implement + the APIs required for specific interactions. This reflects the fact + that not all implementaions will support exactly the same feature set. + Each ProtocolPart is exposed directly on the protocol through an accessor + attribute with a name given by its `name` property. + + :param Executor executor: The Executor instance that's using this Protocol + :param Browser browser: The Browser using this protocol""" + __metaclass__ = ABCMeta + + implements: ClassVar[List[Type["ProtocolPart"]]] = [] + + def __init__(self, executor, browser): + self.executor = executor + self.browser = browser + + for cls in self.implements: + name = cls.name + assert not hasattr(self, name) + setattr(self, name, cls(self)) + + @property + def logger(self): + """:returns: Current logger""" + return self.executor.logger + + def is_alive(self): + """Is the browser connection still active + + :returns: A boolean indicating whether the connection is still active.""" + return True + + def setup(self, runner): + """Handle protocol setup, and send a message to the runner to indicate + success or failure.""" + msg = None + try: + msg = "Failed to start protocol connection" + self.connect() + + msg = None + + for cls in self.implements: + getattr(self, cls.name).setup() + + msg = "Post-connection steps failed" + self.after_connect() + except Exception: + message = "Protocol.setup caught an exception:\n" + message += f"{msg}\n" if msg is not None else "" + message += traceback.format_exc() + self.logger.warning(message) + raise + + @abstractmethod + def connect(self): + """Make a connection to the remote browser""" + pass + + @abstractmethod + def after_connect(self): + """Run any post-connection steps. This happens after the ProtocolParts are + initalized so can depend on a fully-populated object.""" + pass + + def teardown(self): + """Run cleanup steps after the tests are finished.""" + for cls in self.implements: + getattr(self, cls.name).teardown() + + +class ProtocolPart: + """Base class for all ProtocolParts. + + :param Protocol parent: The parent protocol""" + __metaclass__ = ABCMeta + + name: ClassVar[str] + + def __init__(self, parent): + self.parent = parent + + @property + def logger(self): + """:returns: Current logger""" + return self.parent.logger + + def setup(self): + """Run any setup steps required for the ProtocolPart.""" + pass + + def teardown(self): + """Run any teardown steps required for the ProtocolPart.""" + pass + + +class BaseProtocolPart(ProtocolPart): + """Generic bits of protocol that are required for multiple test types""" + __metaclass__ = ABCMeta + + name = "base" + + @abstractmethod + def execute_script(self, script, asynchronous=False): + """Execute javascript in the current Window. + + :param str script: The js source to execute. This is implicitly wrapped in a function. + :param bool asynchronous: Whether the script is asynchronous in the webdriver + sense i.e. whether the return value is the result of + the initial function call or if it waits for some callback. + :returns: The result of the script execution. + """ + pass + + @abstractmethod + def set_timeout(self, timeout): + """Set the timeout for script execution. + + :param timeout: Script timeout in seconds""" + pass + + @abstractmethod + def wait(self): + """Wait indefinitely for the browser to close. + + :returns: True to re-run the test, or False to continue with the next test""" + pass + + @property + def current_window(self): + """Return a handle identifying the current top level browsing context + + :returns: A protocol-specific handle""" + pass + + @abstractmethod + def set_window(self, handle): + """Set the top level browsing context to one specified by a given handle. + + :param handle: A protocol-specific handle identifying a top level browsing + context.""" + pass + + @abstractmethod + def window_handles(self): + """Get a list of handles to top-level browsing contexts""" + pass + + @abstractmethod + def load(self, url): + """Load a url in the current browsing context + + :param url: The url to load""" + pass + + +class TestharnessProtocolPart(ProtocolPart): + """Protocol part required to run testharness tests.""" + __metaclass__ = ABCMeta + + name = "testharness" + + @abstractmethod + def load_runner(self, url_protocol): + """Load the initial page used to control the tests. + + :param str url_protocol: "https" or "http" depending on the test metadata. + """ + pass + + @abstractmethod + def close_old_windows(self, url_protocol): + """Close existing windows except for the initial runner window. + After calling this method there must be exactly one open window that + contains the initial runner page. + + :param str url_protocol: "https" or "http" depending on the test metadata. + """ + pass + + @abstractmethod + def get_test_window(self, window_id, parent): + """Get the window handle dorresponding to the window containing the + currently active test. + + :param window_id: A string containing the DOM name of the Window that + contains the test, or None. + :param parent: The handle of the runner window. + :returns: A protocol-specific window handle. + """ + pass + + @abstractmethod + def test_window_loaded(self): + """Wait until the newly opened test window has been loaded.""" + + +class PrefsProtocolPart(ProtocolPart): + """Protocol part that allows getting and setting browser prefs.""" + __metaclass__ = ABCMeta + + name = "prefs" + + @abstractmethod + def set(self, name, value): + """Set the named pref to value. + + :param name: A pref name of browser-specific type + :param value: A pref value of browser-specific type""" + pass + + @abstractmethod + def get(self, name): + """Get the current value of a named pref + + :param name: A pref name of browser-specific type + :returns: A pref value of browser-specific type""" + pass + + @abstractmethod + def clear(self, name): + """Reset the value of a named pref back to the default. + + :param name: A pref name of browser-specific type""" + pass + + +class StorageProtocolPart(ProtocolPart): + """Protocol part for manipulating browser storage.""" + __metaclass__ = ABCMeta + + name = "storage" + + @abstractmethod + def clear_origin(self, url): + """Clear all the storage for a specified origin. + + :param url: A url belonging to the origin""" + pass + + +class SelectorProtocolPart(ProtocolPart): + """Protocol part for selecting elements on the page.""" + __metaclass__ = ABCMeta + + name = "select" + + def element_by_selector(self, element_selector): + elements = self.elements_by_selector(element_selector) + if len(elements) == 0: + raise ValueError(f"Selector '{element_selector}' matches no elements") + elif len(elements) > 1: + raise ValueError(f"Selector '{element_selector}' matches multiple elements") + return elements[0] + + @abstractmethod + def elements_by_selector(self, selector): + """Select elements matching a CSS selector + + :param str selector: The CSS selector + :returns: A list of protocol-specific handles to elements""" + pass + + +class ClickProtocolPart(ProtocolPart): + """Protocol part for performing trusted clicks""" + __metaclass__ = ABCMeta + + name = "click" + + @abstractmethod + def element(self, element): + """Perform a trusted click somewhere on a specific element. + + :param element: A protocol-specific handle to an element.""" + pass + + + +class AccessibilityProtocolPart(ProtocolPart): + """Protocol part for accessibility introspection""" + __metaclass__ = ABCMeta + + name = "accessibility" + + @abstractmethod + def get_computed_label(self, element): + """Return the computed accessibility label for a specific element. + + :param element: A protocol-specific handle to an element.""" + pass + + def get_computed_role(self, element): + """Return the computed accessibility role for a specific element. + + :param element: A protocol-specific handle to an element.""" + pass + + +class CookiesProtocolPart(ProtocolPart): + """Protocol part for managing cookies""" + __metaclass__ = ABCMeta + + name = "cookies" + + @abstractmethod + def delete_all_cookies(self): + """Delete all cookies.""" + pass + + @abstractmethod + def get_all_cookies(self): + """Get all cookies.""" + pass + + @abstractmethod + def get_named_cookie(self, name): + """Get named cookie. + + :param name: The name of the cookie to get.""" + pass + + +class SendKeysProtocolPart(ProtocolPart): + """Protocol part for performing trusted clicks""" + __metaclass__ = ABCMeta + + name = "send_keys" + + @abstractmethod + def send_keys(self, element, keys): + """Send keys to a specific element. + + :param element: A protocol-specific handle to an element. + :param keys: A protocol-specific handle to a string of input keys.""" + pass + +class WindowProtocolPart(ProtocolPart): + """Protocol part for manipulating the window""" + __metaclass__ = ABCMeta + + name = "window" + + @abstractmethod + def set_rect(self, rect): + """Restores the window to the given rect.""" + pass + + @abstractmethod + def get_rect(self): + """Gets the current window rect.""" + pass + + @abstractmethod + def minimize(self): + """Minimizes the window and returns the previous rect.""" + pass + +class GenerateTestReportProtocolPart(ProtocolPart): + """Protocol part for generating test reports""" + __metaclass__ = ABCMeta + + name = "generate_test_report" + + @abstractmethod + def generate_test_report(self, message): + """Generate a test report. + + :param message: The message to be contained in the report.""" + pass + + +class SetPermissionProtocolPart(ProtocolPart): + """Protocol part for setting permissions""" + __metaclass__ = ABCMeta + + name = "set_permission" + + @abstractmethod + def set_permission(self, descriptor, state): + """Set permission state. + + :param descriptor: A PermissionDescriptor object. + :param state: The state to set the permission to.""" + pass + + +class ActionSequenceProtocolPart(ProtocolPart): + """Protocol part for performing trusted clicks""" + __metaclass__ = ABCMeta + + name = "action_sequence" + + @abstractmethod + def send_actions(self, actions): + """Send a sequence of actions to the window. + + :param actions: A protocol-specific handle to an array of actions.""" + pass + + def release(self): + pass + + +class TestDriverProtocolPart(ProtocolPart): + """Protocol part that implements the basic functionality required for + all testdriver-based tests.""" + __metaclass__ = ABCMeta + + name = "testdriver" + + @abstractmethod + def send_message(self, cmd_id, message_type, status, message=None): + """Send a testdriver message to the browser. + + :param int cmd_id: The id of the command to which we're responding + :param str message_type: The kind of the message. + :param str status: Either "failure" or "success" depending on whether the + previous command succeeded. + :param str message: Additional data to add to the message.""" + pass + + def switch_to_window(self, wptrunner_id, initial_window=None): + """Switch to a window given a wptrunner window id + + :param str wptrunner_id: Testdriver-specific id for the target window + :param str initial_window: WebDriver window id for the test window""" + if wptrunner_id is None: + return + + if initial_window is None: + initial_window = self.parent.base.current_window + + stack = [str(item) for item in self.parent.base.window_handles()] + first = True + while stack: + item = stack.pop() + + if item is None: + assert first is False + self._switch_to_parent_frame() + continue + + if isinstance(item, str): + if not first or item != initial_window: + self.parent.base.set_window(item) + first = False + else: + assert first is False + try: + self._switch_to_frame(item) + except ValueError: + # The frame no longer exists, or doesn't have a nested browsing context, so continue + continue + + try: + # Get the window id and a list of elements containing nested browsing contexts. + # For embed we can't tell fpr sure if there's a nested browsing context, so always return it + # and fail later if there isn't + result = self.parent.base.execute_script(""" + let contextParents = Array.from(document.querySelectorAll("frame, iframe, embed, object")) + .filter(elem => elem.localName !== "embed" ? (elem.contentWindow !== null) : true); + return [window.__wptrunner_id, contextParents]""") + except Exception: + continue + + if result is None: + # With marionette at least this is possible if the content process crashed. Not quite + # sure how we want to handle that case. + continue + + handle_window_id, nested_context_containers = result + + if handle_window_id and str(handle_window_id) == wptrunner_id: + return + + for elem in reversed(nested_context_containers): + # None here makes us switch back to the parent after we've processed the frame + stack.append(None) + stack.append(elem) + + raise Exception("Window with id %s not found" % wptrunner_id) + + @abstractmethod + def _switch_to_frame(self, index_or_elem): + """Switch to a frame in the current window + + :param int index_or_elem: Frame id or container element""" + pass + + @abstractmethod + def _switch_to_parent_frame(self): + """Switch to the parent of the current frame""" + pass + + +class AssertsProtocolPart(ProtocolPart): + """ProtocolPart that implements the functionality required to get a count of non-fatal + assertions triggered""" + __metaclass__ = ABCMeta + + name = "asserts" + + @abstractmethod + def get(self): + """Get a count of assertions since the last browser start""" + pass + + +class CoverageProtocolPart(ProtocolPart): + """Protocol part for collecting per-test coverage data.""" + __metaclass__ = ABCMeta + + name = "coverage" + + @abstractmethod + def reset(self): + """Reset coverage counters""" + pass + + @abstractmethod + def dump(self): + """Dump coverage counters""" + pass + + +class VirtualAuthenticatorProtocolPart(ProtocolPart): + """Protocol part for creating and manipulating virtual authenticators""" + __metaclass__ = ABCMeta + + name = "virtual_authenticator" + + @abstractmethod + def add_virtual_authenticator(self, config): + """Add a virtual authenticator + + :param config: The Authenticator Configuration""" + pass + + @abstractmethod + def remove_virtual_authenticator(self, authenticator_id): + """Remove a virtual authenticator + + :param str authenticator_id: The ID of the authenticator to remove""" + pass + + @abstractmethod + def add_credential(self, authenticator_id, credential): + """Inject a credential onto an authenticator + + :param str authenticator_id: The ID of the authenticator to add the credential to + :param credential: The credential to inject""" + pass + + @abstractmethod + def get_credentials(self, authenticator_id): + """Get the credentials stored in an authenticator + + :param str authenticator_id: The ID of the authenticator + :returns: An array with the credentials stored on the authenticator""" + pass + + @abstractmethod + def remove_credential(self, authenticator_id, credential_id): + """Remove a credential stored in an authenticator + + :param str authenticator_id: The ID of the authenticator + :param str credential_id: The ID of the credential""" + pass + + @abstractmethod + def remove_all_credentials(self, authenticator_id): + """Remove all the credentials stored in an authenticator + + :param str authenticator_id: The ID of the authenticator""" + pass + + @abstractmethod + def set_user_verified(self, authenticator_id, uv): + """Sets the user verified flag on an authenticator + + :param str authenticator_id: The ID of the authenticator + :param bool uv: the user verified flag""" + pass + + +class SPCTransactionsProtocolPart(ProtocolPart): + """Protocol part for Secure Payment Confirmation transactions""" + __metaclass__ = ABCMeta + + name = "spc_transactions" + + @abstractmethod + def set_spc_transaction_mode(self, mode): + """Set the SPC transaction automation mode + + :param str mode: The automation mode to set""" + pass + +class RPHRegistrationsProtocolPart(ProtocolPart): + """Protocol part for Custom Handlers registrations""" + __metaclass__ = ABCMeta + + name = "rph_registrations" + + @abstractmethod + def set_rph_registration_mode(self, mode): + """Set the RPH registration automation mode + + :param str mode: The automation mode to set""" + pass + +class FedCMProtocolPart(ProtocolPart): + """Protocol part for Federated Credential Management""" + __metaclass__ = ABCMeta + + name = "fedcm" + + @abstractmethod + def cancel_fedcm_dialog(self): + """Cancel the FedCM dialog""" + pass + + @abstractmethod + def click_fedcm_dialog_button(self, dialog_button): + """Click a button on the FedCM dialog + + :param str dialog_button: The dialog button to click""" + pass + + @abstractmethod + def select_fedcm_account(self, account_index): + """Select a FedCM account + + :param int account_index: The index of the account to select""" + pass + + @abstractmethod + def get_fedcm_account_list(self): + """Get the FedCM account list""" + pass + + @abstractmethod + def get_fedcm_dialog_title(self): + """Get the FedCM dialog title""" + pass + + @abstractmethod + def get_fedcm_dialog_type(self): + """Get the FedCM dialog type""" + pass + + @abstractmethod + def set_fedcm_delay_enabled(self, enabled): + """Sets the FedCM delay as enabled or disabled + + :param bool enabled: The delay to set""" + pass + + @abstractmethod + def reset_fedcm_cooldown(self): + """Set the FedCM cooldown""" + pass + + +class PrintProtocolPart(ProtocolPart): + """Protocol part for rendering to a PDF.""" + __metaclass__ = ABCMeta + + name = "pdf_print" + + @abstractmethod + def render_as_pdf(self, width, height): + """Output document as PDF""" + pass + + +class DebugProtocolPart(ProtocolPart): + """Protocol part for debugging test failures.""" + __metaclass__ = ABCMeta + + name = "debug" + + @abstractmethod + def load_devtools(self): + """Load devtools in the current window""" + pass + + def load_reftest_analyzer(self, test, result): + import io + import mozlog + from urllib.parse import quote, urljoin + + debug_test_logger = mozlog.structuredlog.StructuredLogger("debug_test") + output = io.StringIO() + debug_test_logger.suite_start([]) + debug_test_logger.add_handler(mozlog.handlers.StreamHandler(output, formatter=mozlog.formatters.TbplFormatter())) + debug_test_logger.test_start(test.id) + # Always use PASS as the expected value so we get output even for expected failures + debug_test_logger.test_end(test.id, result["status"], "PASS", extra=result.get("extra")) + + self.parent.base.load(urljoin(self.parent.executor.server_url("https"), + "/common/third_party/reftest-analyzer.xhtml#log=%s" % + quote(output.getvalue()))) + + +class ConnectionlessBaseProtocolPart(BaseProtocolPart): + def load(self, url): + pass + + def execute_script(self, script, asynchronous=False): + pass + + def set_timeout(self, timeout): + pass + + def wait(self): + return False + + def set_window(self, handle): + pass + + def window_handles(self): + return [] + + +class ConnectionlessProtocol(Protocol): + implements = [ConnectionlessBaseProtocolPart] + + def connect(self): + pass + + def after_connect(self): + pass + + +class WdspecProtocol(ConnectionlessProtocol): + implements = [ConnectionlessBaseProtocolPart] + + def __init__(self, executor, browser): + super().__init__(executor, browser) + + def is_alive(self): + """Test that the connection is still alive. + + Because the remote communication happens over HTTP we need to + make an explicit request to the remote. It is allowed for + WebDriver spec tests to not have a WebDriver session, since this + may be what is tested. + + An HTTP request to an invalid path that results in a 404 is + proof enough to us that the server is alive and kicking. + """ + conn = HTTPConnection(self.browser.host, self.browser.port) + conn.request("HEAD", "/invalid") + res = conn.getresponse() + return res.status == 404 + + +class VirtualSensorProtocolPart(ProtocolPart): + """Protocol part for Sensors""" + __metaclass__ = ABCMeta + + name = "virtual_sensor" + + @abstractmethod + def create_virtual_sensor(self, sensor_type, sensor_params): + pass + + @abstractmethod + def update_virtual_sensor(self, sensor_type, reading): + pass + + @abstractmethod + def remove_virtual_sensor(self, sensor_type): + pass + + @abstractmethod + def get_virtual_sensor_information(self, sensor_type): + pass |