diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
commit | 6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /testing/marionette/client/marionette_driver/marionette.py | |
parent | Initial commit. (diff) | |
download | thunderbird-upstream.tar.xz thunderbird-upstream.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/marionette/client/marionette_driver/marionette.py')
-rw-r--r-- | testing/marionette/client/marionette_driver/marionette.py | 2104 |
1 files changed, 2104 insertions, 0 deletions
diff --git a/testing/marionette/client/marionette_driver/marionette.py b/testing/marionette/client/marionette_driver/marionette.py new file mode 100644 index 0000000000..4a2be8bd5d --- /dev/null +++ b/testing/marionette/client/marionette_driver/marionette.py @@ -0,0 +1,2104 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import base64 +import datetime +import json +import os +import socket +import sys +import time +import traceback +from contextlib import contextmanager + +import six +from six import reraise + +from . import errors, transport +from .decorators import do_process_check +from .geckoinstance import GeckoInstance +from .keys import Keys +from .timeout import Timeouts + +FRAME_KEY = "frame-075b-4da1-b6ba-e579c2d3230a" +WEB_ELEMENT_KEY = "element-6066-11e4-a52e-4f735466cecf" +WEB_SHADOW_ROOT_KEY = "shadow-6066-11e4-a52e-4f735466cecf" +WINDOW_KEY = "window-fcc6-11e5-b4f8-330a88ab9d7f" + + +class MouseButton(object): + """Enum-like class for mouse button constants.""" + + LEFT = 0 + MIDDLE = 1 + RIGHT = 2 + + +class ActionSequence(object): + r"""API for creating and performing action sequences. + + Each action method adds one or more actions to a queue. When perform() + is called, the queued actions fire in order. + + May be chained together as in:: + + ActionSequence(self.marionette, "key", id) \ + .key_down("a") \ + .key_up("a") \ + .perform() + """ + + def __init__(self, marionette, action_type, input_id, pointer_params=None): + self.marionette = marionette + self._actions = [] + self._id = input_id + self._pointer_params = pointer_params + self._type = action_type + + @property + def dict(self): + d = { + "type": self._type, + "id": self._id, + "actions": self._actions, + } + if self._pointer_params is not None: + d["parameters"] = self._pointer_params + return d + + def perform(self): + """Perform all queued actions.""" + self.marionette.actions.perform([self.dict]) + + def _key_action(self, subtype, value): + self._actions.append({"type": subtype, "value": value}) + + def _pointer_action(self, subtype, button): + self._actions.append({"type": subtype, "button": button}) + + def pause(self, duration): + self._actions.append({"type": "pause", "duration": duration}) + return self + + def pointer_move(self, x, y, duration=None, origin=None): + """Queue a pointerMove action. + + :param x: Destination x-axis coordinate of pointer in CSS pixels. + :param y: Destination y-axis coordinate of pointer in CSS pixels. + :param duration: Number of milliseconds over which to distribute the + move. If None, remote end defaults to 0. + :param origin: Origin of coordinates, either "viewport", "pointer" or + an Element. If None, remote end defaults to "viewport". + """ + action = {"type": "pointerMove", "x": x, "y": y} + if duration is not None: + action["duration"] = duration + if origin is not None: + if isinstance(origin, HTMLElement): + action["origin"] = {origin.kind: origin.id} + else: + action["origin"] = origin + self._actions.append(action) + return self + + def pointer_up(self, button=MouseButton.LEFT): + """Queue a pointerUp action for `button`. + + :param button: Pointer button to perform action with. + Default: 0, which represents main device button. + """ + self._pointer_action("pointerUp", button) + return self + + def pointer_down(self, button=MouseButton.LEFT): + """Queue a pointerDown action for `button`. + + :param button: Pointer button to perform action with. + Default: 0, which represents main device button. + """ + self._pointer_action("pointerDown", button) + return self + + def click(self, element=None, button=MouseButton.LEFT): + """Queue a click with the specified button. + + If an element is given, move the pointer to that element first, + otherwise click current pointer coordinates. + + :param element: Optional element to click. + :param button: Integer representing pointer button to perform action + with. Default: 0, which represents main device button. + """ + if element: + self.pointer_move(0, 0, origin=element) + return self.pointer_down(button).pointer_up(button) + + def key_down(self, value): + """Queue a keyDown action for `value`. + + :param value: Single character to perform key action with. + """ + self._key_action("keyDown", value) + return self + + def key_up(self, value): + """Queue a keyUp action for `value`. + + :param value: Single character to perform key action with. + """ + self._key_action("keyUp", value) + return self + + def send_keys(self, keys): + """Queue a keyDown and keyUp action for each character in `keys`. + + :param keys: String of keys to perform key actions with. + """ + for c in keys: + self.key_down(c) + self.key_up(c) + return self + + +class Actions(object): + def __init__(self, marionette): + self.marionette = marionette + + def perform(self, actions=None): + """Perform actions by tick from each action sequence in `actions`. + + :param actions: List of input source action sequences. A single action + sequence may be created with the help of + ``ActionSequence.dict``. + """ + body = {"actions": [] if actions is None else actions} + return self.marionette._send_message("WebDriver:PerformActions", body) + + def release(self): + return self.marionette._send_message("WebDriver:ReleaseActions") + + def sequence(self, *args, **kwargs): + """Return an empty ActionSequence of the designated type. + + See ActionSequence for parameter list. + """ + return ActionSequence(self.marionette, *args, **kwargs) + + +class HTMLElement(object): + """Represents a DOM Element.""" + + identifiers = (FRAME_KEY, WINDOW_KEY, WEB_ELEMENT_KEY) + + def __init__(self, marionette, id, kind=WEB_ELEMENT_KEY): + self.marionette = marionette + assert id is not None + self.id = id + self.kind = kind + + def __str__(self): + return self.id + + def __eq__(self, other_element): + return self.id == other_element.id + + def __hash__(self): + # pylint --py3k: W1641 + return hash(self.id) + + def find_element(self, method, target): + """Returns an ``HTMLElement`` instance that matches the specified + method and target, relative to the current element. + + For more details on this function, see the + :func:`~marionette_driver.marionette.Marionette.find_element` method + in the Marionette class. + """ + return self.marionette.find_element(method, target, self.id) + + def find_elements(self, method, target): + """Returns a list of all ``HTMLElement`` instances that match the + specified method and target in the current context. + + For more details on this function, see the + :func:`~marionette_driver.marionette.Marionette.find_elements` method + in the Marionette class. + """ + return self.marionette.find_elements(method, target, self.id) + + def get_attribute(self, name): + """Returns the requested attribute, or None if no attribute + is set. + """ + body = {"id": self.id, "name": name} + return self.marionette._send_message( + "WebDriver:GetElementAttribute", body, key="value" + ) + + def get_property(self, name): + """Returns the requested property, or None if the property is + not set. + """ + try: + body = {"id": self.id, "name": name} + return self.marionette._send_message( + "WebDriver:GetElementProperty", body, key="value" + ) + except errors.UnknownCommandException: + # Keep backward compatibility for code which uses get_attribute() to + # also retrieve element properties. + # Remove when Firefox 55 is stable. + return self.get_attribute(name) + + def click(self): + """Simulates a click on the element.""" + self.marionette._send_message("WebDriver:ElementClick", {"id": self.id}) + + def tap(self, x=None, y=None): + """Simulates a set of tap events on the element. + + :param x: X coordinate of tap event. If not given, default to + the centre of the element. + :param y: Y coordinate of tap event. If not given, default to + the centre of the element. + """ + body = {"id": self.id, "x": x, "y": y} + self.marionette._send_message("Marionette:SingleTap", body) + + @property + def text(self): + """Returns the visible text of the element, and its child elements.""" + body = {"id": self.id} + return self.marionette._send_message( + "WebDriver:GetElementText", body, key="value" + ) + + def send_keys(self, *strings): + """Sends the string via synthesized keypresses to the element. + If an array is passed in like `marionette.send_keys(Keys.SHIFT, "a")` it + will be joined into a string. + If an integer is passed in like `marionette.send_keys(1234)` it will be + coerced into a string. + """ + keys = Marionette.convert_keys(*strings) + self.marionette._send_message( + "WebDriver:ElementSendKeys", {"id": self.id, "text": keys} + ) + + def clear(self): + """Clears the input of the element.""" + self.marionette._send_message("WebDriver:ElementClear", {"id": self.id}) + + def is_selected(self): + """Returns True if the element is selected.""" + body = {"id": self.id} + return self.marionette._send_message( + "WebDriver:IsElementSelected", body, key="value" + ) + + def is_enabled(self): + """This command will return False if all the following criteria + are met otherwise return True: + + * A form control is disabled. + * A ``HTMLElement`` has a disabled boolean attribute. + """ + body = {"id": self.id} + return self.marionette._send_message( + "WebDriver:IsElementEnabled", body, key="value" + ) + + def is_displayed(self): + """Returns True if the element is displayed, False otherwise.""" + body = {"id": self.id} + return self.marionette._send_message( + "WebDriver:IsElementDisplayed", body, key="value" + ) + + @property + def tag_name(self): + """The tag name of the element.""" + body = {"id": self.id} + return self.marionette._send_message( + "WebDriver:GetElementTagName", body, key="value" + ) + + @property + def rect(self): + """Gets the element's bounding rectangle. + + This will return a dictionary with the following: + + * x and y represent the top left coordinates of the ``HTMLElement`` + relative to top left corner of the document. + * height and the width will contain the height and the width + of the DOMRect of the ``HTMLElement``. + """ + return self.marionette._send_message( + "WebDriver:GetElementRect", {"id": self.id} + ) + + def value_of_css_property(self, property_name): + """Gets the value of the specified CSS property name. + + :param property_name: Property name to get the value of. + """ + body = {"id": self.id, "propertyName": property_name} + return self.marionette._send_message( + "WebDriver:GetElementCSSValue", body, key="value" + ) + + @property + def shadow_root(self): + """Gets the shadow root of the current element""" + return self.marionette._send_message( + "WebDriver:GetShadowRoot", {"id": self.id}, key="value" + ) + + @property + def computed_label(self): + """Gets the computed accessibility label of the current element""" + return self.marionette._send_message( + "WebDriver:GetComputedLabel", {"id": self.id}, key="value" + ) + + @property + def computed_role(self): + """Gets the computed accessibility role of the current element""" + return self.marionette._send_message( + "WebDriver:GetComputedRole", {"id": self.id}, key="value" + ) + + @classmethod + def _from_json(cls, json, marionette): + if isinstance(json, dict): + if WEB_ELEMENT_KEY in json: + return cls(marionette, json[WEB_ELEMENT_KEY], WEB_ELEMENT_KEY) + elif FRAME_KEY in json: + return cls(marionette, json[FRAME_KEY], FRAME_KEY) + elif WINDOW_KEY in json: + return cls(marionette, json[WINDOW_KEY], WINDOW_KEY) + raise ValueError("Unrecognised web element") + + +class ShadowRoot(object): + """A Class to handling Shadow Roots""" + + identifiers = (WEB_SHADOW_ROOT_KEY,) + + def __init__(self, marionette, id, kind=WEB_SHADOW_ROOT_KEY): + self.marionette = marionette + assert id is not None + self.id = id + self.kind = kind + + def __str__(self): + return self.id + + def __eq__(self, other_element): + return self.id == other_element.id + + def __hash__(self): + # pylint --py3k: W1641 + return hash(self.id) + + def find_element(self, method, target): + """Returns an ``HTMLElement`` instance that matches the specified + method and target, relative to the current shadow root. + + For more details on this function, see the + :func:`~marionette_driver.marionette.Marionette.find_element` method + in the Marionette class. + """ + body = {"shadowRoot": self.id, "value": target, "using": method} + return self.marionette._send_message( + "WebDriver:FindElementFromShadowRoot", body, key="value" + ) + + def find_elements(self, method, target): + """Returns a list of all ``HTMLElement`` instances that match the + specified method and target in the current shadow root. + + For more details on this function, see the + :func:`~marionette_driver.marionette.Marionette.find_elements` method + in the Marionette class. + """ + body = {"shadowRoot": self.id, "value": target, "using": method} + return self.marionette._send_message( + "WebDriver:FindElementsFromShadowRoot", body + ) + + @classmethod + def _from_json(cls, json, marionette): + if isinstance(json, dict): + if WEB_SHADOW_ROOT_KEY in json: + return cls(marionette, json[WEB_SHADOW_ROOT_KEY]) + raise ValueError("Unrecognised shadow root") + + +class Alert(object): + """A class for interacting with alerts. + + :: + + Alert(marionette).accept() + Alert(marionette).dismiss() + """ + + def __init__(self, marionette): + self.marionette = marionette + + def accept(self): + """Accept a currently displayed modal dialog.""" + self.marionette._send_message("WebDriver:AcceptAlert") + + def dismiss(self): + """Dismiss a currently displayed modal dialog.""" + self.marionette._send_message("WebDriver:DismissAlert") + + @property + def text(self): + """Return the currently displayed text in a tab modal.""" + return self.marionette._send_message("WebDriver:GetAlertText", key="value") + + def send_keys(self, *string): + """Send keys to the currently displayed text input area in an open + tab modal dialog.""" + self.marionette._send_message( + "WebDriver:SendAlertText", {"text": Marionette.convert_keys(*string)} + ) + + +class Marionette(object): + """Represents a Marionette connection to a browser or device.""" + + CONTEXT_CHROME = "chrome" # non-browser content: windows, dialogs, etc. + CONTEXT_CONTENT = "content" # browser content: iframes, divs, etc. + DEFAULT_STARTUP_TIMEOUT = 120 + DEFAULT_SHUTDOWN_TIMEOUT = ( + 70 # By default Firefox will kill hanging threads after 60s + ) + + # Bug 1336953 - Until we can remove the socket timeout parameter it has to be + # set a default value which is larger than the longest timeout as defined by the + # WebDriver spec. In that case its 300s for page load. Also add another minute + # so that slow builds have enough time to send the timeout error to the client. + DEFAULT_SOCKET_TIMEOUT = 360 + + def __init__( + self, + host="127.0.0.1", + port=2828, + app=None, + bin=None, + baseurl=None, + socket_timeout=None, + startup_timeout=None, + **instance_args + ): + """Construct a holder for the Marionette connection. + + Remember to call ``start_session`` in order to initiate the + connection and start a Marionette session. + + :param host: Host where the Marionette server listens. + Defaults to 127.0.0.1. + :param port: Port where the Marionette server listens. + Defaults to port 2828. + :param baseurl: Where to look for files served from Marionette's + www directory. + :param socket_timeout: Timeout for Marionette socket operations. + :param startup_timeout: Seconds to wait for a connection with + binary. + :param bin: Path to browser binary. If any truthy value is given + this will attempt to start a Gecko instance with the specified + `app`. + :param app: Type of ``instance_class`` to use for managing app + instance. See ``marionette_driver.geckoinstance``. + :param instance_args: Arguments to pass to ``instance_class``. + """ + self.host = "127.0.0.1" # host + if int(port) == 0: + port = Marionette.check_port_available(port) + self.port = self.local_port = int(port) + self.bin = bin + self.client = None + self.instance = None + self.requested_capabilities = None + self.session = None + self.session_id = None + self.process_id = None + self.profile = None + self.window = None + self.chrome_window = None + self.baseurl = baseurl + self._test_name = None + self.crashed = 0 + self.is_shutting_down = False + self.cleanup_ran = False + + if socket_timeout is None: + self.socket_timeout = self.DEFAULT_SOCKET_TIMEOUT + else: + self.socket_timeout = float(socket_timeout) + + if startup_timeout is None: + self.startup_timeout = self.DEFAULT_STARTUP_TIMEOUT + else: + self.startup_timeout = int(startup_timeout) + + self.shutdown_timeout = self.DEFAULT_SHUTDOWN_TIMEOUT + + if self.bin: + self.instance = GeckoInstance.create( + app, host=self.host, port=self.port, bin=self.bin, **instance_args + ) + self.start_binary(self.startup_timeout) + + self.actions = Actions(self) + self.timeout = Timeouts(self) + + @property + def profile_path(self): + if self.instance and self.instance.profile: + return self.instance.profile.profile + + def start_binary(self, timeout): + try: + self.check_port_available(self.port, host=self.host) + except socket.error: + _, value, tb = sys.exc_info() + msg = "Port {}:{} is unavailable ({})".format(self.host, self.port, value) + reraise(IOError, IOError(msg), tb) + + try: + self.instance.start() + self.raise_for_port(timeout=timeout) + except socket.timeout: + # Something went wrong with starting up Marionette server. Given + # that the process will not quit itself, force a shutdown immediately. + self.cleanup() + + msg = ( + "Process killed after {}s because no connection to Marionette " + "server could be established. Check gecko.log for errors" + ) + reraise(IOError, IOError(msg.format(timeout)), sys.exc_info()[2]) + + def cleanup(self): + if self.session is not None: + try: + self.delete_session() + except (errors.MarionetteException, IOError): + # These exceptions get thrown if the Marionette server + # hit an exception/died or the connection died. We can + # do no further server-side cleanup in this case. + pass + + if self.instance: + # stop application and, if applicable, stop emulator + self.instance.close(clean=True) + if self.instance.unresponsive_count >= 3: + raise errors.UnresponsiveInstanceException( + "Application clean-up has failed >2 consecutive times." + ) + + self.cleanup_ran = True + + def __del__(self): + if not self.cleanup_ran: + self.cleanup() + + @staticmethod + def check_port_available(port, host=""): + """Check if "host:port" is available. + + Raise socket.error if port is not available. + """ + port = int(port) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((host, port)) + port = s.getsockname()[1] + finally: + s.close() + return port + + def raise_for_port(self, timeout=None, check_process_status=True): + """Raise socket.timeout if no connection can be established. + + :param timeout: Optional timeout in seconds for the server to be ready. + :param check_process_status: Optional, if `True` the process will be + continuously checked if it has exited, and the connection + attempt will be aborted. + """ + if timeout is None: + timeout = self.startup_timeout + + runner = None + if self.instance is not None: + runner = self.instance.runner + + poll_interval = 0.1 + starttime = datetime.datetime.now() + timeout_time = starttime + datetime.timedelta(seconds=timeout) + + client = transport.TcpTransport(self.host, self.port, 0.5) + + connected = False + while datetime.datetime.now() < timeout_time: + # If the instance we want to connect to is not running return immediately + if check_process_status and runner is not None and not runner.is_running(): + break + + try: + client.connect() + return True + except socket.error: + pass + finally: + client.close() + + time.sleep(poll_interval) + + if not connected: + # There might have been a startup crash of the application + if runner is not None and self.check_for_crash() > 0: + raise IOError("Process crashed (Exit code: {})".format(runner.wait(0))) + + raise socket.timeout( + "Timed out waiting for connection on {0}:{1}!".format( + self.host, self.port + ) + ) + + @do_process_check + def _send_message(self, name, params=None, key=None): + """Send a blocking message to the server. + + Marionette provides an asynchronous, non-blocking interface and + this attempts to paper over this by providing a synchronous API + to the user. + + :param name: Requested command key. + :param params: Optional dictionary of key/value arguments. + :param key: Optional key to extract from response. + + :returns: Full response from the server, or if `key` is given, + the value of said key in the response. + """ + if not self.session_id and name != "WebDriver:NewSession": + raise errors.InvalidSessionIdException("Please start a session") + + try: + msg = self.client.request(name, params) + except IOError: + self.delete_session(send_request=False) + raise + + res, err = msg.result, msg.error + if err: + self._handle_error(err) + + if key is not None: + return self._from_json(res.get(key)) + else: + return self._from_json(res) + + def _handle_error(self, obj): + error = obj["error"] + message = obj["message"] + stacktrace = obj["stacktrace"] + + raise errors.lookup(error)(message, stacktrace=stacktrace) + + def check_for_crash(self): + """Check if the process crashed. + + :returns: True, if a crash happened since the method has been called the last time. + """ + crash_count = 0 + + if self.instance: + name = self.test_name or "marionette.py" + crash_count = self.instance.runner.check_for_crashes(test_name=name) + self.crashed = self.crashed + crash_count + + return crash_count > 0 + + def _handle_socket_failure(self): + """Handle socket failures for the currently connected application. + + If the application crashed then clean-up internal states, or in case of a content + crash also kill the process. If there are other reasons for a socket failure, + wait for the process to shutdown itself, or force kill it. + + Please note that the method expects an exception to be handled on the current stack + frame, and is only called via the `@do_process_check` decorator. + + """ + exc_cls, exc, tb = sys.exc_info() + + # If the application hasn't been launched by Marionette no further action can be done. + # In such cases we simply re-throw the exception. + if not self.instance: + reraise(exc_cls, exc, tb) + + else: + # Somehow the socket disconnected. Give the application some time to shutdown + # itself before killing the process. + returncode = self.instance.runner.wait(timeout=self.shutdown_timeout) + + if returncode is None: + message = ( + "Process killed because the connection to Marionette server is " + "lost. Check gecko.log for errors" + ) + # This will force-close the application without sending any other message. + self.cleanup() + else: + # If Firefox quit itself check if there was a crash + crash_count = self.check_for_crash() + + if crash_count > 0: + # SIGUSR1 indicates a forced shutdown due to a content process crash + if returncode == 245: + message = "Content process crashed" + else: + message = "Process crashed (Exit code: {returncode})" + else: + message = ( + "Process has been unexpectedly closed (Exit code: {returncode})" + ) + + self.delete_session(send_request=False) + + message += " (Reason: {reason})" + + reraise( + IOError, IOError(message.format(returncode=returncode, reason=exc)), tb + ) + + @staticmethod + def convert_keys(*string): + typing = [] + for val in string: + if isinstance(val, Keys): + typing.append(val) + elif isinstance(val, int): + val = str(val) + for i in range(len(val)): + typing.append(val[i]) + else: + for i in range(len(val)): + typing.append(val[i]) + return "".join(typing) + + def clear_pref(self, pref): + """Clear the user-defined value from the specified preference. + + :param pref: Name of the preference. + """ + with self.using_context(self.CONTEXT_CHROME): + self.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + Preferences.reset(arguments[0]); + """, + script_args=(pref,), + ) + + def get_pref(self, pref, default_branch=False, value_type="unspecified"): + """Get the value of the specified preference. + + :param pref: Name of the preference. + :param default_branch: Optional, if `True` the preference value will be read + from the default branch. Otherwise the user-defined + value if set is returned. Defaults to `False`. + :param value_type: Optional, XPCOM interface of the pref's complex value. + Possible values are: `nsIFile` and + `nsIPrefLocalizedString`. + + Usage example:: + + marionette.get_pref("browser.tabs.warnOnClose") + + """ + with self.using_context(self.CONTEXT_CHROME): + pref_value = self.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + + let pref = arguments[0]; + let defaultBranch = arguments[1]; + let valueType = arguments[2]; + + prefs = new Preferences({defaultBranch: defaultBranch}); + return prefs.get(pref, null, Components.interfaces[valueType]); + """, + script_args=(pref, default_branch, value_type), + ) + return pref_value + + def set_pref(self, pref, value, default_branch=False): + """Set the value of the specified preference. + + :param pref: Name of the preference. + :param value: The value to set the preference to. If the value is None, + reset the preference to its default value. If no default + value exists, the preference will cease to exist. + :param default_branch: Optional, if `True` the preference value will + be written to the default branch, and will remain until + the application gets restarted. Otherwise a user-defined + value is set. Defaults to `False`. + + Usage example:: + + marionette.set_pref("browser.tabs.warnOnClose", True) + + """ + with self.using_context(self.CONTEXT_CHROME): + if value is None: + self.clear_pref(pref) + return + + self.execute_script( + """ + const { Preferences } = ChromeUtils.importESModule( + "resource://gre/modules/Preferences.sys.mjs" + ); + + let pref = arguments[0]; + let value = arguments[1]; + let defaultBranch = arguments[2]; + + prefs = new Preferences({defaultBranch: defaultBranch}); + prefs.set(pref, value); + """, + script_args=(pref, value, default_branch), + ) + + def set_prefs(self, prefs, default_branch=False): + """Set the value of a list of preferences. + + :param prefs: A dict containing one or more preferences and their values + to be set. See :func:`set_pref` for further details. + :param default_branch: Optional, if `True` the preference value will + be written to the default branch, and will remain until + the application gets restarted. Otherwise a user-defined + value is set. Defaults to `False`. + + Usage example:: + + marionette.set_prefs({"browser.tabs.warnOnClose": True}) + + """ + for pref, value in prefs.items(): + self.set_pref(pref, value, default_branch=default_branch) + + @contextmanager + def using_prefs(self, prefs, default_branch=False): + """Set preferences for code executed in a `with` block, and restores them on exit. + + :param prefs: A dict containing one or more preferences and their values + to be set. See :func:`set_prefs` for further details. + :param default_branch: Optional, if `True` the preference value will + be written to the default branch, and will remain until + the application gets restarted. Otherwise a user-defined + value is set. Defaults to `False`. + + Usage example:: + + with marionette.using_prefs({"browser.tabs.warnOnClose": True}): + # ... do stuff ... + + """ + original_prefs = {p: self.get_pref(p) for p in prefs} + self.set_prefs(prefs, default_branch=default_branch) + + try: + yield + finally: + self.set_prefs(original_prefs, default_branch=default_branch) + + @do_process_check + def enforce_gecko_prefs(self, prefs): + """Checks if the running instance has the given prefs. If not, + it will kill the currently running instance, and spawn a new + instance with the requested preferences. + + :param prefs: A dictionary whose keys are preference names. + """ + if not self.instance: + raise errors.MarionetteException( + "enforce_gecko_prefs() can only be called " + "on Gecko instances launched by Marionette" + ) + pref_exists = True + with self.using_context(self.CONTEXT_CHROME): + for pref, value in six.iteritems(prefs): + if type(value) is not str: + value = json.dumps(value) + pref_exists = self.execute_script( + """ + let prefInterface = Components.classes["@mozilla.org/preferences-service;1"] + .getService(Components.interfaces.nsIPrefBranch); + let pref = '{0}'; + let value = '{1}'; + let type = prefInterface.getPrefType(pref); + switch(type) {{ + case prefInterface.PREF_STRING: + return value == prefInterface.getCharPref(pref).toString(); + case prefInterface.PREF_BOOL: + return value == prefInterface.getBoolPref(pref).toString(); + case prefInterface.PREF_INT: + return value == prefInterface.getIntPref(pref).toString(); + case prefInterface.PREF_INVALID: + return false; + }} + """.format( + pref, value + ) + ) + if not pref_exists: + break + + if not pref_exists: + context = self._send_message("Marionette:GetContext", key="value") + self.delete_session() + self.instance.restart(prefs) + self.raise_for_port() + self.start_session(self.requested_capabilities) + + # Restore the context as used before the restart + self.set_context(context) + + def _request_in_app_shutdown(self, flags=None, safe_mode=False): + """Attempt to quit the currently running instance from inside the + application. If shutdown is prevented by some component the quit + will be forced. + + This method effectively calls `Services.startup.quit` in Gecko. + Possible flag values are listed at https://bit.ly/3IYcjYi. + + :param flags: Optional additional quit masks to include. + + :param safe_mode: Optional flag to indicate that the application has to + be restarted in safe mode. + + :returns: A dictionary containing details of the application shutdown. + The `cause` property reflects the reason, and `forced` indicates + that something prevented the shutdown and the application had + to be forced to shutdown. + + :throws InvalidArgumentException: If there are multiple + `shutdown_flags` ending with `"Quit"`. + """ + body = {} + if flags is not None: + body["flags"] = list( + flags, + ) + if safe_mode: + body["safeMode"] = safe_mode + + return self._send_message("Marionette:Quit", body) + + @do_process_check + def quit(self, clean=False, in_app=True, callback=None): + """ + By default this method will trigger a normal shutdown of the currently running instance. + But it can also be used to force terminate the process. + + This command will delete the active marionette session. It also allows + manipulation of eg. the profile data while the application is not running. + To start the application again, :func:`start_session` has to be called. + + :param clean: If True a new profile will be used after the next start of + the application. Note that the in_app initiated quit always + maintains the same profile. + + :param in_app: If True, marionette will cause a quit from within the + application. Otherwise the application will be restarted + immediately by killing the process. + + :param callback: If provided and `in_app` is True, the callback will + be used to trigger the shutdown. + + :returns: A dictionary containing details of the application shutdown. + The `cause` property reflects the reason, and `forced` indicates + that something prevented the shutdown and the application had + to be forced to shutdown. + """ + if not self.instance: + raise errors.MarionetteException( + "quit() can only be called " "on Gecko instances launched by Marionette" + ) + + quit_details = {"cause": "shutdown", "forced": False} + + if in_app: + if clean: + raise ValueError( + "An in_app restart cannot be triggered with the clean flag set" + ) + + if callback is not None and not callable(callback): + raise ValueError( + "Specified callback '{}' is not callable".format(callback) + ) + + # Block Marionette from accepting new connections + self._send_message("Marionette:AcceptConnections", {"value": False}) + + try: + self.is_shutting_down = True + if callback is not None: + callback() + quit_details["in_app"] = True + else: + quit_details = self._request_in_app_shutdown() + + except IOError: + # A possible IOError should be ignored at this point, given that + # quit() could have been called inside of `using_context`, + # which wants to reset the context but fails sending the message. + pass + + except Exception: + # For any other error assume the application is not going to shutdown. + # As such allow Marionette to accept new connections again. + self.is_shutting_down = False + self._send_message("Marionette:AcceptConnections", {"value": True}) + raise + + try: + self.delete_session(send_request=False) + + # Try to wait for the process to end itself before force-closing it. + returncode = self.instance.runner.wait(timeout=self.shutdown_timeout) + if returncode is None: + self.cleanup() + + message = "Process still running {}s after quit request" + raise IOError(message.format(self.shutdown_timeout)) + + finally: + self.is_shutting_down = False + + else: + self.delete_session(send_request=False) + self.instance.close(clean=clean) + + quit_details.update({"in_app": False, "forced": True}) + + if quit_details.get("cause") not in (None, "shutdown"): + raise errors.MarionetteException( + "Unexpected shutdown reason '{}' for " + "quitting the process.".format(quit_details["cause"]) + ) + + return quit_details + + @do_process_check + def restart( + self, callback=None, clean=False, in_app=True, safe_mode=False, silent=False + ): + """ + By default this method will restart the currently running instance by using the same + profile. But it can also be forced to terminate the currently running instance, and + to spawn a new instance with the same or different profile. + + :param callback: If provided and `in_app` is True, the callback will be + used to trigger the restart. + + :param clean: If True a new profile will be used after the restart. Note + that the in_app initiated restart always maintains the same + profile. + + :param in_app: If True, marionette will cause a restart from within the + application. Otherwise the application will be restarted + immediately by killing the process. + + :param safe_mode: Optional flag to indicate that the application has to + be restarted in safe mode. + + :param silent: Optional flag to indicate that the application should + not open any window after a restart. Note that this flag is only + supported on MacOS and requires "in_app" to be True. + + :returns: A dictionary containing details of the application restart. + The `cause` property reflects the reason, and `forced` indicates + that something prevented the shutdown and the application had + to be forced to shutdown. + """ + if not self.instance: + raise errors.MarionetteException( + "restart() can only be called " + "on Gecko instances launched by Marionette" + ) + + context = self._send_message("Marionette:GetContext", key="value") + restart_details = {"cause": "restart", "forced": False} + + # Safe mode and the silent flag require an in_app restart. + if (safe_mode or silent) and not in_app: + raise ValueError("An in_app restart is required for safe or silent mode") + + if in_app: + if clean: + raise ValueError( + "An in_app restart cannot be triggered with the clean flag set" + ) + + if callback is not None and not callable(callback): + raise ValueError( + "Specified callback '{}' is not callable".format(callback) + ) + + # Block Marionette from accepting new connections + self._send_message("Marionette:AcceptConnections", {"value": False}) + + try: + self.is_shutting_down = True + if callback is not None: + callback() + restart_details["in_app"] = True + else: + flags = ["eRestart"] + if silent: + flags.append("eSilently") + + try: + restart_details = self._request_in_app_shutdown( + flags=flags, safe_mode=safe_mode + ) + except Exception as e: + self._send_message( + "Marionette:AcceptConnections", {"value": True} + ) + raise e + + except IOError: + # A possible IOError should be ignored at this point, given that + # restart() could have been called inside of `using_context`, + # which wants to reset the context but fails sending the message. + pass + + timeout_restart = self.shutdown_timeout + self.startup_timeout + try: + # Wait for a new Marionette connection to appear while the + # process restarts itself. + self.raise_for_port(timeout=timeout_restart, check_process_status=False) + except socket.timeout: + exc_cls, _, tb = sys.exc_info() + + if self.instance.runner.returncode is None: + # The process is still running, which means the shutdown + # request was not correct or the application ignored it. + # Allow Marionette to accept connections again. + self._send_message("Marionette:AcceptConnections", {"value": True}) + + message = "Process still running {}s after restart request" + reraise(exc_cls, exc_cls(message.format(timeout_restart)), tb) + + else: + # The process shutdown but didn't start again. + self.cleanup() + msg = "Process unexpectedly quit without restarting (exit code: {})" + reraise( + exc_cls, + exc_cls(msg.format(self.instance.runner.returncode)), + tb, + ) + + finally: + self.is_shutting_down = False + + self.delete_session(send_request=False) + + else: + self.delete_session() + self.instance.restart(clean=clean) + self.raise_for_port(timeout=self.DEFAULT_STARTUP_TIMEOUT) + + restart_details.update({"in_app": False, "forced": True}) + + if restart_details.get("cause") not in (None, "restart"): + raise errors.MarionetteException( + "Unexpected shutdown reason '{}' for " + "restarting the process".format(restart_details["cause"]) + ) + + self.start_session(self.requested_capabilities) + # Restore the context as used before the restart + self.set_context(context) + + if in_app and self.process_id: + # In some cases Firefox restarts itself by spawning into a new process group. + # As long as mozprocess cannot track that behavior (bug 1284864) we assist by + # informing about the new process id. + self.instance.runner.process_handler.check_for_detached(self.process_id) + + return restart_details + + def absolute_url(self, relative_url): + """ + Returns an absolute url for files served from Marionette's www directory. + + :param relative_url: The url of a static file, relative to Marionette's www directory. + """ + return "{0}{1}".format(self.baseurl, relative_url) + + @do_process_check + def start_session(self, capabilities=None, timeout=None): + """Create a new WebDriver session. + This method must be called before performing any other action. + + :param capabilities: An optional dictionary of + Marionette-recognised capabilities. It does not + accept a WebDriver conforming capabilities dictionary + (including alwaysMatch, firstMatch, desiredCapabilities, + or requriedCapabilities), and only recognises extension + capabilities that are specific to Marionette. + :param timeout: Optional timeout in seconds for the server to be ready. + :returns: A dictionary of the capabilities offered. + """ + if capabilities is None: + capabilities = {"strictFileInteractability": True} + self.requested_capabilities = capabilities + + if timeout is None: + timeout = self.startup_timeout + + self.crashed = 0 + + if self.instance: + returncode = self.instance.runner.returncode + # We're managing a binary which has terminated. Start it again + # and implicitely wait for the Marionette server to be ready. + if returncode is not None: + self.start_binary(timeout) + + else: + # In the case when Marionette doesn't manage the binary wait until + # its server component has been started. + self.raise_for_port(timeout=timeout) + + self.client = transport.TcpTransport(self.host, self.port, self.socket_timeout) + self.protocol, _ = self.client.connect() + + try: + resp = self._send_message("WebDriver:NewSession", capabilities) + except errors.UnknownException: + # Force closing the managed process when the session cannot be + # created due to global JavaScript errors. + exc_type, value, tb = sys.exc_info() + if self.instance and self.instance.runner.is_running(): + self.instance.close() + reraise(exc_type, exc_type(value.message), tb) + + self.session_id = resp["sessionId"] + self.session = resp["capabilities"] + self.cleanup_ran = False + # fallback to processId can be removed in Firefox 55 + self.process_id = self.session.get( + "moz:processID", self.session.get("processId") + ) + self.profile = self.session.get("moz:profile") + + timeout = self.session.get("moz:shutdownTimeout") + if timeout is not None: + # pylint --py3k W1619 + self.shutdown_timeout = timeout / 1000 + 10 + + return self.session + + @property + def test_name(self): + return self._test_name + + @test_name.setter + def test_name(self, test_name): + self._test_name = test_name + + def delete_session(self, send_request=True): + """Close the current session and disconnect from the server. + + :param send_request: Optional, if `True` a request to close the session on + the server side will be sent. Use `False` in case of eg. in_app restart() + or quit(), which trigger a deletion themselves. Defaults to `True`. + """ + try: + if send_request: + try: + self._send_message("WebDriver:DeleteSession") + except errors.InvalidSessionIdException: + pass + finally: + self.process_id = None + self.profile = None + self.session = None + self.session_id = None + self.window = None + + if self.client is not None: + self.client.close() + + @property + def session_capabilities(self): + """A JSON dictionary representing the capabilities of the + current session. + + """ + return self.session + + @property + def current_window_handle(self): + """Get the current window's handle. + + Returns an opaque server-assigned identifier to this window + that uniquely identifies it within this Marionette instance. + This can be used to switch to this window at a later point. + + :returns: unique window handle + :rtype: string + """ + with self.using_context("content"): + self.window = self._send_message("WebDriver:GetWindowHandle", key="value") + + return self.window + + @property + def current_chrome_window_handle(self): + """Get the current chrome window's handle. Corresponds to + a chrome window that may itself contain tabs identified by + window_handles. + + Returns an opaque server-assigned identifier to this window + that uniquely identifies it within this Marionette instance. + This can be used to switch to this window at a later point. + + :returns: unique window handle + :rtype: string + """ + with self.using_context("chrome"): + self.chrome_window = self._send_message( + "WebDriver:GetWindowHandle", key="value" + ) + + return self.chrome_window + + def set_window_rect(self, x=None, y=None, height=None, width=None): + """Set the position and size of the current window. + + The supplied width and height values refer to the window outerWidth + and outerHeight values, which include scroll bars, title bars, etc. + + An error will be returned if the requested window size would result + in the window being in the maximised state. + + :param x: x coordinate for the top left of the window + :param y: y coordinate for the top left of the window + :param width: The width to resize the window to. + :param height: The height to resize the window to. + """ + if (x is None and y is None) and (height is None and width is None): + raise errors.InvalidArgumentException( + "x and y or height and width need values" + ) + + body = {"x": x, "y": y, "height": height, "width": width} + return self._send_message("WebDriver:SetWindowRect", body) + + @property + def window_rect(self): + return self._send_message("WebDriver:GetWindowRect") + + @property + def title(self): + """Current title of the active window.""" + return self._send_message("WebDriver:GetTitle", key="value") + + @property + def window_handles(self): + """Get list of windows in the current context. + + If called in the content context it will return a list of + references to all available browser windows. + + Each window handle is assigned by the server, and the list of + strings returned does not have a guaranteed ordering. + + :returns: Unordered list of unique window handles as strings + """ + with self.using_context("content"): + return self._send_message("WebDriver:GetWindowHandles") + + @property + def chrome_window_handles(self): + """Get a list of currently open chrome windows. + + Each window handle is assigned by the server, and the list of + strings returned does not have a guaranteed ordering. + + :returns: Unordered list of unique chrome window handles as strings + """ + with self.using_context("chrome"): + return self._send_message("WebDriver:GetWindowHandles") + + @property + def page_source(self): + """A string representation of the DOM.""" + return self._send_message("WebDriver:GetPageSource", key="value") + + def open(self, type=None, focus=False, private=False): + """Open a new window, or tab based on the specified context type. + + If no context type is given the application will choose the best + option based on tab and window support. + + :param type: Type of window to be opened. Can be one of "tab" or "window" + :param focus: If true, the opened window will be focused + :param private: If true, open a private window + + :returns: Dict with new window handle, and type of opened window + """ + body = {"type": type, "focus": focus, "private": private} + return self._send_message("WebDriver:NewWindow", body) + + def close(self): + """Close the current window, ending the session if it's the last + window currently open. + + :returns: Unordered list of remaining unique window handles as strings + """ + return self._send_message("WebDriver:CloseWindow") + + def close_chrome_window(self): + """Close the currently selected chrome window, ending the session + if it's the last window open. + + :returns: Unordered list of remaining unique chrome window handles as strings + """ + return self._send_message("WebDriver:CloseChromeWindow") + + def set_context(self, context): + """Sets the context that Marionette commands are running in. + + :param context: Context, may be one of the class properties + `CONTEXT_CHROME` or `CONTEXT_CONTENT`. + + Usage example:: + + marionette.set_context(marionette.CONTEXT_CHROME) + """ + if context not in [self.CONTEXT_CHROME, self.CONTEXT_CONTENT]: + raise ValueError("Unknown context: {}".format(context)) + + self._send_message("Marionette:SetContext", {"value": context}) + + @contextmanager + def using_context(self, context): + """Sets the context that Marionette commands are running in using + a `with` statement. The state of the context on the server is + saved before entering the block, and restored upon exiting it. + + :param context: Context, may be one of the class properties + `CONTEXT_CHROME` or `CONTEXT_CONTENT`. + + Usage example:: + + with marionette.using_context(marionette.CONTEXT_CHROME): + # chrome scope + ... do stuff ... + """ + scope = self._send_message("Marionette:GetContext", key="value") + self.set_context(context) + try: + yield + finally: + self.set_context(scope) + + def switch_to_alert(self): + """Returns an :class:`~marionette_driver.marionette.Alert` object for + interacting with a currently displayed alert. + + :: + + alert = self.marionette.switch_to_alert() + text = alert.text + alert.accept() + """ + return Alert(self) + + def switch_to_window(self, handle, focus=True): + """Switch to the specified window; subsequent commands will be + directed at the new window. + + :param handle: The id of the window to switch to. + + :param focus: A boolean value which determins whether to focus + the window that we just switched to. + """ + self._send_message( + "WebDriver:SwitchToWindow", {"handle": handle, "focus": focus} + ) + self.window = handle + + def switch_to_default_content(self): + """Switch the current context to page's default content.""" + return self.switch_to_frame() + + def switch_to_parent_frame(self): + """ + Switch to the Parent Frame + """ + self._send_message("WebDriver:SwitchToParentFrame") + + def switch_to_frame(self, frame=None): + """Switch the current context to the specified frame. Subsequent + commands will operate in the context of the specified frame, + if applicable. + + :param frame: A reference to the frame to switch to. This can + be an :class:`~marionette_driver.marionette.HTMLElement`, + or an integer index. If you call ``switch_to_frame`` without an + argument, it will switch to the top-level frame. + """ + body = {} + if isinstance(frame, HTMLElement): + body["element"] = frame.id + elif frame is not None: + body["id"] = frame + + self._send_message("WebDriver:SwitchToFrame", body) + + def get_url(self): + """Get a string representing the current URL. + + On Desktop this returns a string representation of the URL of + the current top level browsing context. This is equivalent to + document.location.href. + + When in the context of the chrome, this returns the canonical + URL of the current resource. + + :returns: string representation of URL + """ + return self._send_message("WebDriver:GetCurrentURL", key="value") + + def get_window_type(self): + """Gets the windowtype attribute of the window Marionette is + currently acting on. + + This command only makes sense in a chrome context. You might use this + method to distinguish a browser window from an editor window. + """ + try: + return self._send_message("Marionette:GetWindowType", key="value") + except errors.UnknownCommandException: + return self._send_message("getWindowType", key="value") + + def navigate(self, url): + """Navigate to given `url`. + + Navigates the current top-level browsing context's content + frame to the given URL and waits for the document to load or + the session's page timeout duration to elapse before returning. + + The command will return with a failure if there is an error + loading the document or the URL is blocked. This can occur if + it fails to reach the host, the URL is malformed, the page is + restricted (about:* pages), or if there is a certificate issue + to name some examples. + + The document is considered successfully loaded when the + `DOMContentLoaded` event on the frame element associated with the + `window` triggers and `document.readyState` is "complete". + + In chrome context it will change the current `window`'s location + to the supplied URL and wait until `document.readyState` equals + "complete" or the page timeout duration has elapsed. + + :param url: The URL to navigate to. + """ + self._send_message("WebDriver:Navigate", {"url": url}) + + def go_back(self): + """Causes the browser to perform a back navigation.""" + self._send_message("WebDriver:Back") + + def go_forward(self): + """Causes the browser to perform a forward navigation.""" + self._send_message("WebDriver:Forward") + + def refresh(self): + """Causes the browser to perform to refresh the current page.""" + self._send_message("WebDriver:Refresh") + + def _to_json(self, args): + if isinstance(args, list) or isinstance(args, tuple): + wrapped = [] + for arg in args: + wrapped.append(self._to_json(arg)) + elif isinstance(args, dict): + wrapped = {} + for arg in args: + wrapped[arg] = self._to_json(args[arg]) + elif type(args) == HTMLElement: + wrapped = {WEB_ELEMENT_KEY: args.id} + elif type(args) == ShadowRoot: + wrapped = {WEB_SHADOW_ROOT_KEY: args.id} + elif ( + isinstance(args, bool) + or isinstance(args, six.string_types) + or isinstance(args, int) + or isinstance(args, float) + or args is None + ): + wrapped = args + return wrapped + + def _from_json(self, value): + if isinstance(value, dict) and any( + k in value.keys() for k in HTMLElement.identifiers + ): + return HTMLElement._from_json(value, self) + elif isinstance(value, dict) and any( + k in value.keys() for k in ShadowRoot.identifiers + ): + return ShadowRoot._from_json(value, self) + elif isinstance(value, dict): + return {key: self._from_json(val) for key, val in value.items()} + elif isinstance(value, list): + return list(self._from_json(item) for item in value) + else: + return value + + def execute_script( + self, + script, + script_args=(), + new_sandbox=True, + sandbox="default", + script_timeout=None, + ): + """Executes a synchronous JavaScript script, and returns the + result (or None if the script does return a value). + + The script is executed in the context set by the most recent + :func:`set_context` call, or to the CONTEXT_CONTENT context if + :func:`set_context` has not been called. + + :param script: A string containing the JavaScript to execute. + :param script_args: An interable of arguments to pass to the script. + :param new_sandbox: If False, preserve global variables from + the last execute_*script call. This is True by default, in which + case no globals are preserved. + :param sandbox: A tag referring to the sandbox you wish to use; + if you specify a new tag, a new sandbox will be created. + If you use the special tag `system`, the sandbox will + be created using the system principal which has elevated + privileges. + :param script_timeout: Timeout in milliseconds, overriding + the session's default script timeout. + + Simple usage example: + + :: + + result = marionette.execute_script("return 1;") + assert result == 1 + + You can use the `script_args` parameter to pass arguments to the + script: + + :: + + result = marionette.execute_script("return arguments[0] + arguments[1];", + script_args=(2, 3,)) + assert result == 5 + some_element = marionette.find_element(By.ID, "someElement") + sid = marionette.execute_script("return arguments[0].id;", script_args=(some_element,)) + assert some_element.get_attribute("id") == sid + + Scripts wishing to access non-standard properties of the window + object must use window.wrappedJSObject: + + :: + + result = marionette.execute_script(''' + window.wrappedJSObject.test1 = "foo"; + window.wrappedJSObject.test2 = "bar"; + return window.wrappedJSObject.test1 + window.wrappedJSObject.test2; + ''') + assert result == "foobar" + + Global variables set by individual scripts do not persist between + script calls by default. If you wish to persist data between + script calls, you can set `new_sandbox` to False on your next call, + and add any new variables to a new 'global' object like this: + + :: + + marionette.execute_script("global.test1 = 'foo';") + result = self.marionette.execute_script("return global.test1;", new_sandbox=False) + assert result == "foo" + + """ + original_timeout = None + if script_timeout is not None: + original_timeout = self.timeout.script + self.timeout.script = script_timeout / 1000.0 + + try: + args = self._to_json(script_args) + stack = traceback.extract_stack() + frame = stack[-2:-1][0] # grab the second-to-last frame + filename = ( + frame[0] if sys.platform == "win32" else os.path.relpath(frame[0]) + ) + body = { + "script": script.strip(), + "args": args, + "newSandbox": new_sandbox, + "sandbox": sandbox, + "line": int(frame[1]), + "filename": filename, + } + rv = self._send_message("WebDriver:ExecuteScript", body, key="value") + + finally: + if script_timeout is not None: + self.timeout.script = original_timeout + + return rv + + def execute_async_script( + self, + script, + script_args=(), + new_sandbox=True, + sandbox="default", + script_timeout=None, + ): + """Executes an asynchronous JavaScript script, and returns the + result (or None if the script does return a value). + + The script is executed in the context set by the most recent + :func:`set_context` call, or to the CONTEXT_CONTENT context if + :func:`set_context` has not been called. + + :param script: A string containing the JavaScript to execute. + :param script_args: An interable of arguments to pass to the script. + :param new_sandbox: If False, preserve global variables from + the last execute_*script call. This is True by default, + in which case no globals are preserved. + :param sandbox: A tag referring to the sandbox you wish to use; if + you specify a new tag, a new sandbox will be created. If you + use the special tag `system`, the sandbox will be created + using the system principal which has elevated privileges. + :param script_timeout: Timeout in milliseconds, overriding + the session's default script timeout. + + Usage example: + + :: + + marionette.timeout.script = 10 + result = self.marionette.execute_async_script(''' + // this script waits 5 seconds, and then returns the number 1 + let [resolve] = arguments; + setTimeout(function() { + resolve(1); + }, 5000); + ''') + assert result == 1 + """ + original_timeout = None + if script_timeout is not None: + original_timeout = self.timeout.script + self.timeout.script = script_timeout / 1000.0 + + try: + args = self._to_json(script_args) + stack = traceback.extract_stack() + frame = stack[-2:-1][0] # grab the second-to-last frame + filename = ( + frame[0] if sys.platform == "win32" else os.path.relpath(frame[0]) + ) + body = { + "script": script.strip(), + "args": args, + "newSandbox": new_sandbox, + "sandbox": sandbox, + "scriptTimeout": script_timeout, + "line": int(frame[1]), + "filename": filename, + } + rv = self._send_message("WebDriver:ExecuteAsyncScript", body, key="value") + + finally: + if script_timeout is not None: + self.timeout.script = original_timeout + + return rv + + def find_element(self, method, target, id=None): + """Returns an :class:`~marionette_driver.marionette.HTMLElement` + instance that matches the specified method and target in the current + context. + + An :class:`~marionette_driver.marionette.HTMLElement` instance may be + used to call other methods on the element, such as + :func:`~marionette_driver.marionette.HTMLElement.click`. If no element + is immediately found, the attempt to locate an element will be repeated + for up to the amount of time set by + :attr:`marionette_driver.timeout.Timeouts.implicit`. If multiple + elements match the given criteria, only the first is returned. If no + element matches, a ``NoSuchElementException`` will be raised. + + :param method: The method to use to locate the element; one of: + "id", "name", "class name", "tag name", "css selector", + "link text", "partial link text" and "xpath". + Note that the "name", "link text" and "partial link test" + methods are not supported in the chrome DOM. + :param target: The target of the search. For example, if method = + "tag", target might equal "div". If method = "id", target would + be an element id. + :param id: If specified, search for elements only inside the element + with the specified id. + """ + body = {"value": target, "using": method} + if id: + body["element"] = id + + return self._send_message("WebDriver:FindElement", body, key="value") + + def find_elements(self, method, target, id=None): + """Returns a list of all + :class:`~marionette_driver.marionette.HTMLElement` instances that match + the specified method and target in the current context. + + An :class:`~marionette_driver.marionette.HTMLElement` instance may be + used to call other methods on the element, such as + :func:`~marionette_driver.marionette.HTMLElement.click`. If no element + is immediately found, the attempt to locate an element will be repeated + for up to the amount of time set by + :attr:`marionette_driver.timeout.Timeouts.implicit`. + + :param method: The method to use to locate the elements; one + of: "id", "name", "class name", "tag name", "css selector", + "link text", "partial link text" and "xpath". + Note that the "name", "link text" and "partial link test" + methods are not supported in the chrome DOM. + :param target: The target of the search. For example, if method = + "tag", target might equal "div". If method = "id", target would be + an element id. + :param id: If specified, search for elements only inside the element + with the specified id. + """ + body = {"value": target, "using": method} + if id: + body["element"] = id + + return self._send_message("WebDriver:FindElements", body) + + def get_active_element(self): + el_or_ref = self._send_message("WebDriver:GetActiveElement", key="value") + return el_or_ref + + def add_cookie(self, cookie): + """Adds a cookie to your current session. + + :param cookie: A dictionary object, with required keys - "name" + and "value"; optional keys - "path", "domain", "secure", + "expiry". + + Usage example: + + :: + + driver.add_cookie({"name": "foo", "value": "bar"}) + driver.add_cookie({"name": "foo", "value": "bar", "path": "/"}) + driver.add_cookie({"name": "foo", "value": "bar", "path": "/", + "secure": True}) + """ + self._send_message("WebDriver:AddCookie", {"cookie": cookie}) + + def delete_all_cookies(self): + """Delete all cookies in the scope of the current session. + + Usage example: + + :: + + driver.delete_all_cookies() + """ + self._send_message("WebDriver:DeleteAllCookies") + + def delete_cookie(self, name): + """Delete a cookie by its name. + + :param name: Name of cookie to delete. + + Usage example: + + :: + + driver.delete_cookie("foo") + """ + self._send_message("WebDriver:DeleteCookie", {"name": name}) + + def get_cookie(self, name): + """Get a single cookie by name. Returns the cookie if found, + None if not. + + :param name: Name of cookie to get. + """ + cookies = self.get_cookies() + for cookie in cookies: + if cookie["name"] == name: + return cookie + return None + + def get_cookies(self): + """Get all the cookies for the current domain. + + This is the equivalent of calling `document.cookie` and + parsing the result. + + :returns: A list of cookies for the current domain. + """ + return self._send_message("WebDriver:GetCookies") + + def save_screenshot(self, fh, element=None, full=True, scroll=True): + """Takes a screenhot of a web element or the current frame and + saves it in the filehandle. + + It is a wrapper around screenshot() + :param fh: The filehandle to save the screenshot at. + + The rest of the parameters are defined like in screenshot() + """ + data = self.screenshot(element, "binary", full, scroll) + fh.write(data) + + def screenshot(self, element=None, format="base64", full=True, scroll=True): + """Takes a screenshot of a web element or the current frame. + + The screen capture is returned as a lossless PNG image encoded + as a base 64 string by default. If the `element` argument is defined the + capture area will be limited to the bounding box of that + element. Otherwise, the capture area will be the bounding box + of the current frame. + + :param element: The element to take a screenshot of. If None, will + take a screenshot of the current frame. + + :param format: if "base64" (the default), returns the screenshot + as a base64-string. If "binary", the data is decoded and + returned as raw binary. If "hash", the data is hashed using + the SHA-256 algorithm and the result is returned as a hex digest. + + :param full: If True (the default), the capture area will be the + complete frame. Else only the viewport is captured. Only applies + when `element` is None. + + :param scroll: When `element` is provided, scroll to it before + taking the screenshot (default). Otherwise, avoid scrolling + `element` into view. + """ + + if element: + element = element.id + + body = {"id": element, "full": full, "hash": False, "scroll": scroll} + if format == "hash": + body["hash"] = True + + data = self._send_message("WebDriver:TakeScreenshot", body, key="value") + + if format == "base64" or format == "hash": + return data + elif format == "binary": + return base64.b64decode(data.encode("ascii")) + else: + raise ValueError( + "format parameter must be either 'base64'" + " or 'binary', not {0}".format(repr(format)) + ) + + @property + def orientation(self): + """Get the current browser orientation. + + Will return one of the valid primary orientation values + portrait-primary, landscape-primary, portrait-secondary, or + landscape-secondary. + """ + try: + return self._send_message("Marionette:GetScreenOrientation", key="value") + except errors.UnknownCommandException: + return self._send_message("getScreenOrientation", key="value") + + def set_orientation(self, orientation): + """Set the current browser orientation. + + The supplied orientation should be given as one of the valid + orientation values. If the orientation is unknown, an error + will be raised. + + Valid orientations are "portrait" and "landscape", which fall + back to "portrait-primary" and "landscape-primary" + respectively, and "portrait-secondary" as well as + "landscape-secondary". + + :param orientation: The orientation to lock the screen in. + """ + body = {"orientation": orientation} + try: + self._send_message("Marionette:SetScreenOrientation", body) + except errors.UnknownCommandException: + self._send_message("setScreenOrientation", body) + + def minimize_window(self): + """Iconify the browser window currently receiving commands. + The action should be equivalent to the user pressing the minimize + button in the OS window. + + Note that this command is not available on Fennec. It may also + not be available in certain window managers. + + :returns Window rect. + """ + return self._send_message("WebDriver:MinimizeWindow") + + def maximize_window(self): + """Resize the browser window currently receiving commands. + The action should be equivalent to the user pressing the maximize + button in the OS window. + + + Note that this command is not available on Fennec. It may also + not be available in certain window managers. + + :returns: Window rect. + """ + return self._send_message("WebDriver:MaximizeWindow") + + def fullscreen(self): + """Synchronously sets the user agent window to full screen as + if the user had done "View > Enter Full Screen", or restores + it if it is already in full screen. + + :returns: Window rect. + """ + return self._send_message("WebDriver:FullscreenWindow") |