# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import base64 import datetime import json import os import socket import sys import time import traceback from contextlib import contextmanager import six from six import reraise from . import errors, transport from .decorators import do_process_check from .geckoinstance import GeckoInstance from .keys import Keys from .timeout import Timeouts FRAME_KEY = "frame-075b-4da1-b6ba-e579c2d3230a" WEB_ELEMENT_KEY = "element-6066-11e4-a52e-4f735466cecf" WEB_SHADOW_ROOT_KEY = "shadow-6066-11e4-a52e-4f735466cecf" WINDOW_KEY = "window-fcc6-11e5-b4f8-330a88ab9d7f" class MouseButton(object): """Enum-like class for mouse button constants.""" LEFT = 0 MIDDLE = 1 RIGHT = 2 class ActionSequence(object): r"""API for creating and performing action sequences. Each action method adds one or more actions to a queue. When perform() is called, the queued actions fire in order. May be chained together as in:: ActionSequence(self.marionette, "key", id) \ .key_down("a") \ .key_up("a") \ .perform() """ def __init__(self, marionette, action_type, input_id, pointer_params=None): self.marionette = marionette self._actions = [] self._id = input_id self._pointer_params = pointer_params self._type = action_type @property def dict(self): d = { "type": self._type, "id": self._id, "actions": self._actions, } if self._pointer_params is not None: d["parameters"] = self._pointer_params return d def perform(self): """Perform all queued actions.""" self.marionette.actions.perform([self.dict]) def _key_action(self, subtype, value): self._actions.append({"type": subtype, "value": value}) def _pointer_action(self, subtype, button): self._actions.append({"type": subtype, "button": button}) def pause(self, duration): self._actions.append({"type": "pause", "duration": duration}) return self def pointer_move(self, x, y, duration=None, origin=None): """Queue a pointerMove action. :param x: Destination x-axis coordinate of pointer in CSS pixels. :param y: Destination y-axis coordinate of pointer in CSS pixels. :param duration: Number of milliseconds over which to distribute the move. If None, remote end defaults to 0. :param origin: Origin of coordinates, either "viewport", "pointer" or an Element. If None, remote end defaults to "viewport". """ action = {"type": "pointerMove", "x": x, "y": y} if duration is not None: action["duration"] = duration if origin is not None: if isinstance(origin, HTMLElement): action["origin"] = {origin.kind: origin.id} else: action["origin"] = origin self._actions.append(action) return self def pointer_up(self, button=MouseButton.LEFT): """Queue a pointerUp action for `button`. :param button: Pointer button to perform action with. Default: 0, which represents main device button. """ self._pointer_action("pointerUp", button) return self def pointer_down(self, button=MouseButton.LEFT): """Queue a pointerDown action for `button`. :param button: Pointer button to perform action with. Default: 0, which represents main device button. """ self._pointer_action("pointerDown", button) return self def click(self, element=None, button=MouseButton.LEFT): """Queue a click with the specified button. If an element is given, move the pointer to that element first, otherwise click current pointer coordinates. :param element: Optional element to click. :param button: Integer representing pointer button to perform action with. Default: 0, which represents main device button. """ if element: self.pointer_move(0, 0, origin=element) return self.pointer_down(button).pointer_up(button) def key_down(self, value): """Queue a keyDown action for `value`. :param value: Single character to perform key action with. """ self._key_action("keyDown", value) return self def key_up(self, value): """Queue a keyUp action for `value`. :param value: Single character to perform key action with. """ self._key_action("keyUp", value) return self def send_keys(self, keys): """Queue a keyDown and keyUp action for each character in `keys`. :param keys: String of keys to perform key actions with. """ for c in keys: self.key_down(c) self.key_up(c) return self class Actions(object): def __init__(self, marionette): self.marionette = marionette def perform(self, actions=None): """Perform actions by tick from each action sequence in `actions`. :param actions: List of input source action sequences. A single action sequence may be created with the help of ``ActionSequence.dict``. """ body = {"actions": [] if actions is None else actions} return self.marionette._send_message("WebDriver:PerformActions", body) def release(self): return self.marionette._send_message("WebDriver:ReleaseActions") def sequence(self, *args, **kwargs): """Return an empty ActionSequence of the designated type. See ActionSequence for parameter list. """ return ActionSequence(self.marionette, *args, **kwargs) class HTMLElement(object): """Represents a DOM Element.""" identifiers = (FRAME_KEY, WINDOW_KEY, WEB_ELEMENT_KEY) def __init__(self, marionette, id, kind=WEB_ELEMENT_KEY): self.marionette = marionette assert id is not None self.id = id self.kind = kind def __str__(self): return self.id def __eq__(self, other_element): return self.id == other_element.id def __hash__(self): # pylint --py3k: W1641 return hash(self.id) def find_element(self, method, target): """Returns an ``HTMLElement`` instance that matches the specified method and target, relative to the current element. For more details on this function, see the :func:`~marionette_driver.marionette.Marionette.find_element` method in the Marionette class. """ return self.marionette.find_element(method, target, self.id) def find_elements(self, method, target): """Returns a list of all ``HTMLElement`` instances that match the specified method and target in the current context. For more details on this function, see the :func:`~marionette_driver.marionette.Marionette.find_elements` method in the Marionette class. """ return self.marionette.find_elements(method, target, self.id) def get_attribute(self, name): """Returns the requested attribute, or None if no attribute is set. """ body = {"id": self.id, "name": name} return self.marionette._send_message( "WebDriver:GetElementAttribute", body, key="value" ) def get_property(self, name): """Returns the requested property, or None if the property is not set. """ try: body = {"id": self.id, "name": name} return self.marionette._send_message( "WebDriver:GetElementProperty", body, key="value" ) except errors.UnknownCommandException: # Keep backward compatibility for code which uses get_attribute() to # also retrieve element properties. # Remove when Firefox 55 is stable. return self.get_attribute(name) def click(self): """Simulates a click on the element.""" self.marionette._send_message("WebDriver:ElementClick", {"id": self.id}) def tap(self, x=None, y=None): """Simulates a set of tap events on the element. :param x: X coordinate of tap event. If not given, default to the centre of the element. :param y: Y coordinate of tap event. If not given, default to the centre of the element. """ body = {"id": self.id, "x": x, "y": y} self.marionette._send_message("Marionette:SingleTap", body) @property def text(self): """Returns the visible text of the element, and its child elements.""" body = {"id": self.id} return self.marionette._send_message( "WebDriver:GetElementText", body, key="value" ) def send_keys(self, *strings): """Sends the string via synthesized keypresses to the element. If an array is passed in like `marionette.send_keys(Keys.SHIFT, "a")` it will be joined into a string. If an integer is passed in like `marionette.send_keys(1234)` it will be coerced into a string. """ keys = Marionette.convert_keys(*strings) self.marionette._send_message( "WebDriver:ElementSendKeys", {"id": self.id, "text": keys} ) def clear(self): """Clears the input of the element.""" self.marionette._send_message("WebDriver:ElementClear", {"id": self.id}) def is_selected(self): """Returns True if the element is selected.""" body = {"id": self.id} return self.marionette._send_message( "WebDriver:IsElementSelected", body, key="value" ) def is_enabled(self): """This command will return False if all the following criteria are met otherwise return True: * A form control is disabled. * A ``HTMLElement`` has a disabled boolean attribute. """ body = {"id": self.id} return self.marionette._send_message( "WebDriver:IsElementEnabled", body, key="value" ) def is_displayed(self): """Returns True if the element is displayed, False otherwise.""" body = {"id": self.id} return self.marionette._send_message( "WebDriver:IsElementDisplayed", body, key="value" ) @property def tag_name(self): """The tag name of the element.""" body = {"id": self.id} return self.marionette._send_message( "WebDriver:GetElementTagName", body, key="value" ) @property def rect(self): """Gets the element's bounding rectangle. This will return a dictionary with the following: * x and y represent the top left coordinates of the ``HTMLElement`` relative to top left corner of the document. * height and the width will contain the height and the width of the DOMRect of the ``HTMLElement``. """ return self.marionette._send_message( "WebDriver:GetElementRect", {"id": self.id} ) def value_of_css_property(self, property_name): """Gets the value of the specified CSS property name. :param property_name: Property name to get the value of. """ body = {"id": self.id, "propertyName": property_name} return self.marionette._send_message( "WebDriver:GetElementCSSValue", body, key="value" ) @property def shadow_root(self): """Gets the shadow root of the current element""" return self.marionette._send_message( "WebDriver:GetShadowRoot", {"id": self.id}, key="value" ) @classmethod def _from_json(cls, json, marionette): if isinstance(json, dict): if WEB_ELEMENT_KEY in json: return cls(marionette, json[WEB_ELEMENT_KEY], WEB_ELEMENT_KEY) elif FRAME_KEY in json: return cls(marionette, json[FRAME_KEY], FRAME_KEY) elif WINDOW_KEY in json: return cls(marionette, json[WINDOW_KEY], WINDOW_KEY) raise ValueError("Unrecognised web element") class ShadowRoot(object): """A Class to handling Shadow Roots""" identifiers = (WEB_SHADOW_ROOT_KEY,) def __init__(self, marionette, id, kind=WEB_SHADOW_ROOT_KEY): self.marionette = marionette assert id is not None self.id = id self.kind = kind def __str__(self): return self.id def __eq__(self, other_element): return self.id == other_element.id def __hash__(self): # pylint --py3k: W1641 return hash(self.id) @classmethod def _from_json(cls, json, marionette): if isinstance(json, dict): if WEB_SHADOW_ROOT_KEY in json: return cls(marionette, json[WEB_SHADOW_ROOT_KEY]) raise ValueError("Unrecognised shadow root") class Alert(object): """A class for interacting with alerts. :: Alert(marionette).accept() Alert(marionette).dismiss() """ def __init__(self, marionette): self.marionette = marionette def accept(self): """Accept a currently displayed modal dialog.""" self.marionette._send_message("WebDriver:AcceptAlert") def dismiss(self): """Dismiss a currently displayed modal dialog.""" self.marionette._send_message("WebDriver:DismissAlert") @property def text(self): """Return the currently displayed text in a tab modal.""" return self.marionette._send_message("WebDriver:GetAlertText", key="value") def send_keys(self, *string): """Send keys to the currently displayed text input area in an open tab modal dialog.""" self.marionette._send_message( "WebDriver:SendAlertText", {"text": Marionette.convert_keys(*string)} ) class Marionette(object): """Represents a Marionette connection to a browser or device.""" CONTEXT_CHROME = "chrome" # non-browser content: windows, dialogs, etc. CONTEXT_CONTENT = "content" # browser content: iframes, divs, etc. DEFAULT_STARTUP_TIMEOUT = 120 DEFAULT_SHUTDOWN_TIMEOUT = ( 70 # By default Firefox will kill hanging threads after 60s ) # Bug 1336953 - Until we can remove the socket timeout parameter it has to be # set a default value which is larger than the longest timeout as defined by the # WebDriver spec. In that case its 300s for page load. Also add another minute # so that slow builds have enough time to send the timeout error to the client. DEFAULT_SOCKET_TIMEOUT = 360 def __init__( self, host="127.0.0.1", port=2828, app=None, bin=None, baseurl=None, socket_timeout=None, startup_timeout=None, **instance_args ): """Construct a holder for the Marionette connection. Remember to call ``start_session`` in order to initiate the connection and start a Marionette session. :param host: Host where the Marionette server listens. Defaults to 127.0.0.1. :param port: Port where the Marionette server listens. Defaults to port 2828. :param baseurl: Where to look for files served from Marionette's www directory. :param socket_timeout: Timeout for Marionette socket operations. :param startup_timeout: Seconds to wait for a connection with binary. :param bin: Path to browser binary. If any truthy value is given this will attempt to start a Gecko instance with the specified `app`. :param app: Type of ``instance_class`` to use for managing app instance. See ``marionette_driver.geckoinstance``. :param instance_args: Arguments to pass to ``instance_class``. """ self.host = "127.0.0.1" # host if int(port) == 0: port = Marionette.check_port_available(port) self.port = self.local_port = int(port) self.bin = bin self.client = None self.instance = None self.requested_capabilities = None self.session = None self.session_id = None self.process_id = None self.profile = None self.window = None self.chrome_window = None self.baseurl = baseurl self._test_name = None self.crashed = 0 self.is_shutting_down = False self.cleanup_ran = False if socket_timeout is None: self.socket_timeout = self.DEFAULT_SOCKET_TIMEOUT else: self.socket_timeout = float(socket_timeout) if startup_timeout is None: self.startup_timeout = self.DEFAULT_STARTUP_TIMEOUT else: self.startup_timeout = int(startup_timeout) self.shutdown_timeout = self.DEFAULT_SHUTDOWN_TIMEOUT if self.bin: self.instance = GeckoInstance.create( app, host=self.host, port=self.port, bin=self.bin, **instance_args ) self.start_binary(self.startup_timeout) self.actions = Actions(self) self.timeout = Timeouts(self) @property def profile_path(self): if self.instance and self.instance.profile: return self.instance.profile.profile def start_binary(self, timeout): try: self.check_port_available(self.port, host=self.host) except socket.error: _, value, tb = sys.exc_info() msg = "Port {}:{} is unavailable ({})".format(self.host, self.port, value) reraise(IOError, IOError(msg), tb) try: self.instance.start() self.raise_for_port(timeout=timeout) except socket.timeout: # Something went wrong with starting up Marionette server. Given # that the process will not quit itself, force a shutdown immediately. self.cleanup() msg = ( "Process killed after {}s because no connection to Marionette " "server could be established. Check gecko.log for errors" ) reraise(IOError, IOError(msg.format(timeout)), sys.exc_info()[2]) def cleanup(self): if self.session is not None: try: self.delete_session() except (errors.MarionetteException, IOError): # These exceptions get thrown if the Marionette server # hit an exception/died or the connection died. We can # do no further server-side cleanup in this case. pass if self.instance: # stop application and, if applicable, stop emulator self.instance.close(clean=True) if self.instance.unresponsive_count >= 3: raise errors.UnresponsiveInstanceException( "Application clean-up has failed >2 consecutive times." ) self.cleanup_ran = True def __del__(self): if not self.cleanup_ran: self.cleanup() @staticmethod def check_port_available(port, host=""): """Check if "host:port" is available. Raise socket.error if port is not available. """ port = int(port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind((host, port)) port = s.getsockname()[1] finally: s.close() return port def raise_for_port(self, timeout=None, check_process_status=True): """Raise socket.timeout if no connection can be established. :param timeout: Optional timeout in seconds for the server to be ready. :param check_process_status: Optional, if `True` the process will be continuously checked if it has exited, and the connection attempt will be aborted. """ if timeout is None: timeout = self.startup_timeout runner = None if self.instance is not None: runner = self.instance.runner poll_interval = 0.1 starttime = datetime.datetime.now() timeout_time = starttime + datetime.timedelta(seconds=timeout) client = transport.TcpTransport(self.host, self.port, 0.5) connected = False while datetime.datetime.now() < timeout_time: # If the instance we want to connect to is not running return immediately if check_process_status and runner is not None and not runner.is_running(): break try: client.connect() return True except socket.error: pass finally: client.close() time.sleep(poll_interval) if not connected: # There might have been a startup crash of the application if runner is not None and self.check_for_crash() > 0: raise IOError("Process crashed (Exit code: {})".format(runner.wait(0))) raise socket.timeout( "Timed out waiting for connection on {0}:{1}!".format( self.host, self.port ) ) @do_process_check def _send_message(self, name, params=None, key=None): """Send a blocking message to the server. Marionette provides an asynchronous, non-blocking interface and this attempts to paper over this by providing a synchronous API to the user. :param name: Requested command key. :param params: Optional dictionary of key/value arguments. :param key: Optional key to extract from response. :returns: Full response from the server, or if `key` is given, the value of said key in the response. """ if not self.session_id and name != "WebDriver:NewSession": raise errors.InvalidSessionIdException("Please start a session") try: msg = self.client.request(name, params) except IOError: self.delete_session(send_request=False) raise res, err = msg.result, msg.error if err: self._handle_error(err) if key is not None: return self._from_json(res.get(key)) else: return self._from_json(res) def _handle_error(self, obj): error = obj["error"] message = obj["message"] stacktrace = obj["stacktrace"] raise errors.lookup(error)(message, stacktrace=stacktrace) def check_for_crash(self): """Check if the process crashed. :returns: True, if a crash happened since the method has been called the last time. """ crash_count = 0 if self.instance: name = self.test_name or "marionette.py" crash_count = self.instance.runner.check_for_crashes(test_name=name) self.crashed = self.crashed + crash_count return crash_count > 0 def _handle_socket_failure(self): """Handle socket failures for the currently connected application. If the application crashed then clean-up internal states, or in case of a content crash also kill the process. If there are other reasons for a socket failure, wait for the process to shutdown itself, or force kill it. Please note that the method expects an exception to be handled on the current stack frame, and is only called via the `@do_process_check` decorator. """ exc_cls, exc, tb = sys.exc_info() # If the application hasn't been launched by Marionette no further action can be done. # In such cases we simply re-throw the exception. if not self.instance: reraise(exc_cls, exc, tb) else: # Somehow the socket disconnected. Give the application some time to shutdown # itself before killing the process. returncode = self.instance.runner.wait(timeout=self.shutdown_timeout) if returncode is None: message = ( "Process killed because the connection to Marionette server is " "lost. Check gecko.log for errors" ) # This will force-close the application without sending any other message. self.cleanup() else: # If Firefox quit itself check if there was a crash crash_count = self.check_for_crash() if crash_count > 0: # SIGUSR1 indicates a forced shutdown due to a content process crash if returncode == 245: message = "Content process crashed" else: message = "Process crashed (Exit code: {returncode})" else: message = ( "Process has been unexpectedly closed (Exit code: {returncode})" ) self.delete_session(send_request=False) message += " (Reason: {reason})" reraise( IOError, IOError(message.format(returncode=returncode, reason=exc)), tb ) @staticmethod def convert_keys(*string): typing = [] for val in string: if isinstance(val, Keys): typing.append(val) elif isinstance(val, int): val = str(val) for i in range(len(val)): typing.append(val[i]) else: for i in range(len(val)): typing.append(val[i]) return "".join(typing) def clear_pref(self, pref): """Clear the user-defined value from the specified preference. :param pref: Name of the preference. """ with self.using_context(self.CONTEXT_CHROME): self.execute_script( """ const { Preferences } = ChromeUtils.importESModule( "resource://gre/modules/Preferences.sys.mjs" ); Preferences.reset(arguments[0]); """, script_args=(pref,), ) def get_pref(self, pref, default_branch=False, value_type="unspecified"): """Get the value of the specified preference. :param pref: Name of the preference. :param default_branch: Optional, if `True` the preference value will be read from the default branch. Otherwise the user-defined value if set is returned. Defaults to `False`. :param value_type: Optional, XPCOM interface of the pref's complex value. Possible values are: `nsIFile` and `nsIPrefLocalizedString`. Usage example:: marionette.get_pref("browser.tabs.warnOnClose") """ with self.using_context(self.CONTEXT_CHROME): pref_value = self.execute_script( """ const { Preferences } = ChromeUtils.importESModule( "resource://gre/modules/Preferences.sys.mjs" ); let pref = arguments[0]; let defaultBranch = arguments[1]; let valueType = arguments[2]; prefs = new Preferences({defaultBranch: defaultBranch}); return prefs.get(pref, null, Components.interfaces[valueType]); """, script_args=(pref, default_branch, value_type), ) return pref_value def set_pref(self, pref, value, default_branch=False): """Set the value of the specified preference. :param pref: Name of the preference. :param value: The value to set the preference to. If the value is None, reset the preference to its default value. If no default value exists, the preference will cease to exist. :param default_branch: Optional, if `True` the preference value will be written to the default branch, and will remain until the application gets restarted. Otherwise a user-defined value is set. Defaults to `False`. Usage example:: marionette.set_pref("browser.tabs.warnOnClose", True) """ with self.using_context(self.CONTEXT_CHROME): if value is None: self.clear_pref(pref) return self.execute_script( """ const { Preferences } = ChromeUtils.importESModule( "resource://gre/modules/Preferences.sys.mjs" ); let pref = arguments[0]; let value = arguments[1]; let defaultBranch = arguments[2]; prefs = new Preferences({defaultBranch: defaultBranch}); prefs.set(pref, value); """, script_args=(pref, value, default_branch), ) def set_prefs(self, prefs, default_branch=False): """Set the value of a list of preferences. :param prefs: A dict containing one or more preferences and their values to be set. See :func:`set_pref` for further details. :param default_branch: Optional, if `True` the preference value will be written to the default branch, and will remain until the application gets restarted. Otherwise a user-defined value is set. Defaults to `False`. Usage example:: marionette.set_prefs({"browser.tabs.warnOnClose": True}) """ for pref, value in prefs.items(): self.set_pref(pref, value, default_branch=default_branch) @contextmanager def using_prefs(self, prefs, default_branch=False): """Set preferences for code executed in a `with` block, and restores them on exit. :param prefs: A dict containing one or more preferences and their values to be set. See :func:`set_prefs` for further details. :param default_branch: Optional, if `True` the preference value will be written to the default branch, and will remain until the application gets restarted. Otherwise a user-defined value is set. Defaults to `False`. Usage example:: with marionette.using_prefs({"browser.tabs.warnOnClose": True}): # ... do stuff ... """ original_prefs = {p: self.get_pref(p) for p in prefs} self.set_prefs(prefs, default_branch=default_branch) try: yield finally: self.set_prefs(original_prefs, default_branch=default_branch) @do_process_check def enforce_gecko_prefs(self, prefs): """Checks if the running instance has the given prefs. If not, it will kill the currently running instance, and spawn a new instance with the requested preferences. :param prefs: A dictionary whose keys are preference names. """ if not self.instance: raise errors.MarionetteException( "enforce_gecko_prefs() can only be called " "on Gecko instances launched by Marionette" ) pref_exists = True with self.using_context(self.CONTEXT_CHROME): for pref, value in six.iteritems(prefs): if type(value) is not str: value = json.dumps(value) pref_exists = self.execute_script( """ let prefInterface = Components.classes["@mozilla.org/preferences-service;1"] .getService(Components.interfaces.nsIPrefBranch); let pref = '{0}'; let value = '{1}'; let type = prefInterface.getPrefType(pref); switch(type) {{ case prefInterface.PREF_STRING: return value == prefInterface.getCharPref(pref).toString(); case prefInterface.PREF_BOOL: return value == prefInterface.getBoolPref(pref).toString(); case prefInterface.PREF_INT: return value == prefInterface.getIntPref(pref).toString(); case prefInterface.PREF_INVALID: return false; }} """.format( pref, value ) ) if not pref_exists: break if not pref_exists: context = self._send_message("Marionette:GetContext", key="value") self.delete_session() self.instance.restart(prefs) self.raise_for_port() self.start_session(self.requested_capabilities) # Restore the context as used before the restart self.set_context(context) def _request_in_app_shutdown(self, flags=None, safe_mode=False): """Attempt to quit the currently running instance from inside the application. If shutdown is prevented by some component the quit will be forced. This method effectively calls `Services.startup.quit` in Gecko. Possible flag values are listed at https://bit.ly/3IYcjYi. :param flags: Optional additional quit masks to include. :param safe_mode: Optional flag to indicate that the application has to be restarted in safe mode. :returns: A dictionary containing details of the application shutdown. The `cause` property reflects the reason, and `forced` indicates that something prevented the shutdown and the application had to be forced to shutdown. :throws InvalidArgumentException: If there are multiple `shutdown_flags` ending with `"Quit"`. """ body = {} if flags is not None: body["flags"] = list( flags, ) if safe_mode: body["safeMode"] = safe_mode return self._send_message("Marionette:Quit", body) @do_process_check def quit(self, clean=False, in_app=True, callback=None): """ By default this method will trigger a normal shutdown of the currently running instance. But it can also be used to force terminate the process. This command will delete the active marionette session. It also allows manipulation of eg. the profile data while the application is not running. To start the application again, :func:`start_session` has to be called. :param clean: If True a new profile will be used after the next start of the application. Note that the in_app initiated quit always maintains the same profile. :param in_app: If True, marionette will cause a quit from within the application. Otherwise the application will be restarted immediately by killing the process. :param callback: If provided and `in_app` is True, the callback will be used to trigger the shutdown. :returns: A dictionary containing details of the application shutdown. The `cause` property reflects the reason, and `forced` indicates that something prevented the shutdown and the application had to be forced to shutdown. """ if not self.instance: raise errors.MarionetteException( "quit() can only be called " "on Gecko instances launched by Marionette" ) quit_details = {"cause": "shutdown", "forced": False} if in_app: if clean: raise ValueError( "An in_app restart cannot be triggered with the clean flag set" ) if callback is not None and not callable(callback): raise ValueError( "Specified callback '{}' is not callable".format(callback) ) # Block Marionette from accepting new connections self._send_message("Marionette:AcceptConnections", {"value": False}) try: self.is_shutting_down = True if callback is not None: callback() quit_details["in_app"] = True else: quit_details = self._request_in_app_shutdown() except IOError: # A possible IOError should be ignored at this point, given that # quit() could have been called inside of `using_context`, # which wants to reset the context but fails sending the message. pass except Exception: # For any other error assume the application is not going to shutdown. # As such allow Marionette to accept new connections again. self.is_shutting_down = False self._send_message("Marionette:AcceptConnections", {"value": True}) raise try: self.delete_session(send_request=False) # Try to wait for the process to end itself before force-closing it. returncode = self.instance.runner.wait(timeout=self.shutdown_timeout) if returncode is None: self.cleanup() message = "Process still running {}s after quit request" raise IOError(message.format(self.shutdown_timeout)) finally: self.is_shutting_down = False else: self.delete_session(send_request=False) self.instance.close(clean=clean) quit_details.update({"in_app": False, "forced": True}) if quit_details.get("cause") not in (None, "shutdown"): raise errors.MarionetteException( "Unexpected shutdown reason '{}' for " "quitting the process.".format(quit_details["cause"]) ) return quit_details @do_process_check def restart( self, callback=None, clean=False, in_app=True, safe_mode=False, silent=False ): """ By default this method will restart the currently running instance by using the same profile. But it can also be forced to terminate the currently running instance, and to spawn a new instance with the same or different profile. :param callback: If provided and `in_app` is True, the callback will be used to trigger the restart. :param clean: If True a new profile will be used after the restart. Note that the in_app initiated restart always maintains the same profile. :param in_app: If True, marionette will cause a restart from within the application. Otherwise the application will be restarted immediately by killing the process. :param safe_mode: Optional flag to indicate that the application has to be restarted in safe mode. :param silent: Optional flag to indicate that the application should not open any window after a restart. Note that this flag is only supported on MacOS and requires "in_app" to be True. :returns: A dictionary containing details of the application restart. The `cause` property reflects the reason, and `forced` indicates that something prevented the shutdown and the application had to be forced to shutdown. """ if not self.instance: raise errors.MarionetteException( "restart() can only be called " "on Gecko instances launched by Marionette" ) context = self._send_message("Marionette:GetContext", key="value") restart_details = {"cause": "restart", "forced": False} # Safe mode and the silent flag require an in_app restart. if (safe_mode or silent) and not in_app: raise ValueError("An in_app restart is required for safe or silent mode") if in_app: if clean: raise ValueError( "An in_app restart cannot be triggered with the clean flag set" ) if callback is not None and not callable(callback): raise ValueError( "Specified callback '{}' is not callable".format(callback) ) # Block Marionette from accepting new connections self._send_message("Marionette:AcceptConnections", {"value": False}) try: self.is_shutting_down = True if callback is not None: callback() restart_details["in_app"] = True else: flags = ["eRestart"] if silent: flags.append("eSilently") try: restart_details = self._request_in_app_shutdown( flags=flags, safe_mode=safe_mode ) except Exception as e: self._send_message( "Marionette:AcceptConnections", {"value": True} ) raise e except IOError: # A possible IOError should be ignored at this point, given that # restart() could have been called inside of `using_context`, # which wants to reset the context but fails sending the message. pass timeout_restart = self.shutdown_timeout + self.startup_timeout try: # Wait for a new Marionette connection to appear while the # process restarts itself. self.raise_for_port(timeout=timeout_restart, check_process_status=False) except socket.timeout: exc_cls, _, tb = sys.exc_info() if self.instance.runner.returncode is None: # The process is still running, which means the shutdown # request was not correct or the application ignored it. # Allow Marionette to accept connections again. self._send_message("Marionette:AcceptConnections", {"value": True}) message = "Process still running {}s after restart request" reraise(exc_cls, exc_cls(message.format(timeout_restart)), tb) else: # The process shutdown but didn't start again. self.cleanup() msg = "Process unexpectedly quit without restarting (exit code: {})" reraise( exc_cls, exc_cls(msg.format(self.instance.runner.returncode)), tb, ) finally: self.is_shutting_down = False self.delete_session(send_request=False) else: self.delete_session() self.instance.restart(clean=clean) self.raise_for_port(timeout=self.DEFAULT_STARTUP_TIMEOUT) restart_details.update({"in_app": False, "forced": True}) if restart_details.get("cause") not in (None, "restart"): raise errors.MarionetteException( "Unexpected shutdown reason '{}' for " "restarting the process".format(restart_details["cause"]) ) self.start_session(self.requested_capabilities) # Restore the context as used before the restart self.set_context(context) if in_app and self.process_id: # In some cases Firefox restarts itself by spawning into a new process group. # As long as mozprocess cannot track that behavior (bug 1284864) we assist by # informing about the new process id. self.instance.runner.process_handler.check_for_detached(self.process_id) return restart_details def absolute_url(self, relative_url): """ Returns an absolute url for files served from Marionette's www directory. :param relative_url: The url of a static file, relative to Marionette's www directory. """ return "{0}{1}".format(self.baseurl, relative_url) @do_process_check def start_session(self, capabilities=None, timeout=None): """Create a new WebDriver session. This method must be called before performing any other action. :param capabilities: An optional dictionary of Marionette-recognised capabilities. It does not accept a WebDriver conforming capabilities dictionary (including alwaysMatch, firstMatch, desiredCapabilities, or requriedCapabilities), and only recognises extension capabilities that are specific to Marionette. :param timeout: Optional timeout in seconds for the server to be ready. :returns: A dictionary of the capabilities offered. """ if capabilities is None: capabilities = {"strictFileInteractability": True} self.requested_capabilities = capabilities if timeout is None: timeout = self.startup_timeout self.crashed = 0 if self.instance: returncode = self.instance.runner.returncode # We're managing a binary which has terminated. Start it again # and implicitely wait for the Marionette server to be ready. if returncode is not None: self.start_binary(timeout) else: # In the case when Marionette doesn't manage the binary wait until # its server component has been started. self.raise_for_port(timeout=timeout) self.client = transport.TcpTransport(self.host, self.port, self.socket_timeout) self.protocol, _ = self.client.connect() try: resp = self._send_message("WebDriver:NewSession", capabilities) except errors.UnknownException: # Force closing the managed process when the session cannot be # created due to global JavaScript errors. exc_type, value, tb = sys.exc_info() if self.instance and self.instance.runner.is_running(): self.instance.close() reraise(exc_type, exc_type(value.message), tb) self.session_id = resp["sessionId"] self.session = resp["capabilities"] self.cleanup_ran = False # fallback to processId can be removed in Firefox 55 self.process_id = self.session.get( "moz:processID", self.session.get("processId") ) self.profile = self.session.get("moz:profile") timeout = self.session.get("moz:shutdownTimeout") if timeout is not None: # pylint --py3k W1619 self.shutdown_timeout = timeout / 1000 + 10 return self.session @property def test_name(self): return self._test_name @test_name.setter def test_name(self, test_name): self._test_name = test_name def delete_session(self, send_request=True): """Close the current session and disconnect from the server. :param send_request: Optional, if `True` a request to close the session on the server side will be sent. Use `False` in case of eg. in_app restart() or quit(), which trigger a deletion themselves. Defaults to `True`. """ try: if send_request: try: self._send_message("WebDriver:DeleteSession") except errors.InvalidSessionIdException: pass finally: self.process_id = None self.profile = None self.session = None self.session_id = None self.window = None if self.client is not None: self.client.close() @property def session_capabilities(self): """A JSON dictionary representing the capabilities of the current session. """ return self.session @property def current_window_handle(self): """Get the current window's handle. Returns an opaque server-assigned identifier to this window that uniquely identifies it within this Marionette instance. This can be used to switch to this window at a later point. :returns: unique window handle :rtype: string """ with self.using_context("content"): self.window = self._send_message("WebDriver:GetWindowHandle", key="value") return self.window @property def current_chrome_window_handle(self): """Get the current chrome window's handle. Corresponds to a chrome window that may itself contain tabs identified by window_handles. Returns an opaque server-assigned identifier to this window that uniquely identifies it within this Marionette instance. This can be used to switch to this window at a later point. :returns: unique window handle :rtype: string """ with self.using_context("chrome"): self.chrome_window = self._send_message( "WebDriver:GetWindowHandle", key="value" ) return self.chrome_window def set_window_rect(self, x=None, y=None, height=None, width=None): """Set the position and size of the current window. The supplied width and height values refer to the window outerWidth and outerHeight values, which include scroll bars, title bars, etc. An error will be returned if the requested window size would result in the window being in the maximised state. :param x: x coordinate for the top left of the window :param y: y coordinate for the top left of the window :param width: The width to resize the window to. :param height: The height to resize the window to. """ if (x is None and y is None) and (height is None and width is None): raise errors.InvalidArgumentException( "x and y or height and width need values" ) body = {"x": x, "y": y, "height": height, "width": width} return self._send_message("WebDriver:SetWindowRect", body) @property def window_rect(self): return self._send_message("WebDriver:GetWindowRect") @property def title(self): """Current title of the active window.""" return self._send_message("WebDriver:GetTitle", key="value") @property def window_handles(self): """Get list of windows in the current context. If called in the content context it will return a list of references to all available browser windows. Each window handle is assigned by the server, and the list of strings returned does not have a guaranteed ordering. :returns: Unordered list of unique window handles as strings """ with self.using_context("content"): return self._send_message("WebDriver:GetWindowHandles") @property def chrome_window_handles(self): """Get a list of currently open chrome windows. Each window handle is assigned by the server, and the list of strings returned does not have a guaranteed ordering. :returns: Unordered list of unique chrome window handles as strings """ with self.using_context("chrome"): return self._send_message("WebDriver:GetWindowHandles") @property def page_source(self): """A string representation of the DOM.""" return self._send_message("WebDriver:GetPageSource", key="value") def open(self, type=None, focus=False, private=False): """Open a new window, or tab based on the specified context type. If no context type is given the application will choose the best option based on tab and window support. :param type: Type of window to be opened. Can be one of "tab" or "window" :param focus: If true, the opened window will be focused :param private: If true, open a private window :returns: Dict with new window handle, and type of opened window """ body = {"type": type, "focus": focus, "private": private} return self._send_message("WebDriver:NewWindow", body) def close(self): """Close the current window, ending the session if it's the last window currently open. :returns: Unordered list of remaining unique window handles as strings """ return self._send_message("WebDriver:CloseWindow") def close_chrome_window(self): """Close the currently selected chrome window, ending the session if it's the last window open. :returns: Unordered list of remaining unique chrome window handles as strings """ return self._send_message("WebDriver:CloseChromeWindow") def set_context(self, context): """Sets the context that Marionette commands are running in. :param context: Context, may be one of the class properties `CONTEXT_CHROME` or `CONTEXT_CONTENT`. Usage example:: marionette.set_context(marionette.CONTEXT_CHROME) """ if context not in [self.CONTEXT_CHROME, self.CONTEXT_CONTENT]: raise ValueError("Unknown context: {}".format(context)) self._send_message("Marionette:SetContext", {"value": context}) @contextmanager def using_context(self, context): """Sets the context that Marionette commands are running in using a `with` statement. The state of the context on the server is saved before entering the block, and restored upon exiting it. :param context: Context, may be one of the class properties `CONTEXT_CHROME` or `CONTEXT_CONTENT`. Usage example:: with marionette.using_context(marionette.CONTEXT_CHROME): # chrome scope ... do stuff ... """ scope = self._send_message("Marionette:GetContext", key="value") self.set_context(context) try: yield finally: self.set_context(scope) def switch_to_alert(self): """Returns an :class:`~marionette_driver.marionette.Alert` object for interacting with a currently displayed alert. :: alert = self.marionette.switch_to_alert() text = alert.text alert.accept() """ return Alert(self) def switch_to_window(self, handle, focus=True): """Switch to the specified window; subsequent commands will be directed at the new window. :param handle: The id of the window to switch to. :param focus: A boolean value which determins whether to focus the window that we just switched to. """ self._send_message( "WebDriver:SwitchToWindow", {"handle": handle, "focus": focus} ) self.window = handle def switch_to_default_content(self): """Switch the current context to page's default content.""" return self.switch_to_frame() def switch_to_parent_frame(self): """ Switch to the Parent Frame """ self._send_message("WebDriver:SwitchToParentFrame") def switch_to_frame(self, frame=None): """Switch the current context to the specified frame. Subsequent commands will operate in the context of the specified frame, if applicable. :param frame: A reference to the frame to switch to. This can be an :class:`~marionette_driver.marionette.HTMLElement`, or an integer index. If you call ``switch_to_frame`` without an argument, it will switch to the top-level frame. """ body = {} if isinstance(frame, HTMLElement): body["element"] = frame.id elif frame is not None: body["id"] = frame self._send_message("WebDriver:SwitchToFrame", body) def get_url(self): """Get a string representing the current URL. On Desktop this returns a string representation of the URL of the current top level browsing context. This is equivalent to document.location.href. When in the context of the chrome, this returns the canonical URL of the current resource. :returns: string representation of URL """ return self._send_message("WebDriver:GetCurrentURL", key="value") def get_window_type(self): """Gets the windowtype attribute of the window Marionette is currently acting on. This command only makes sense in a chrome context. You might use this method to distinguish a browser window from an editor window. """ try: return self._send_message("Marionette:GetWindowType", key="value") except errors.UnknownCommandException: return self._send_message("getWindowType", key="value") def navigate(self, url): """Navigate to given `url`. Navigates the current top-level browsing context's content frame to the given URL and waits for the document to load or the session's page timeout duration to elapse before returning. The command will return with a failure if there is an error loading the document or the URL is blocked. This can occur if it fails to reach the host, the URL is malformed, the page is restricted (about:* pages), or if there is a certificate issue to name some examples. The document is considered successfully loaded when the `DOMContentLoaded` event on the frame element associated with the `window` triggers and `document.readyState` is "complete". In chrome context it will change the current `window`'s location to the supplied URL and wait until `document.readyState` equals "complete" or the page timeout duration has elapsed. :param url: The URL to navigate to. """ self._send_message("WebDriver:Navigate", {"url": url}) def go_back(self): """Causes the browser to perform a back navigation.""" self._send_message("WebDriver:Back") def go_forward(self): """Causes the browser to perform a forward navigation.""" self._send_message("WebDriver:Forward") def refresh(self): """Causes the browser to perform to refresh the current page.""" self._send_message("WebDriver:Refresh") def _to_json(self, args): if isinstance(args, list) or isinstance(args, tuple): wrapped = [] for arg in args: wrapped.append(self._to_json(arg)) elif isinstance(args, dict): wrapped = {} for arg in args: wrapped[arg] = self._to_json(args[arg]) elif type(args) == HTMLElement: wrapped = {WEB_ELEMENT_KEY: args.id} elif type(args) == ShadowRoot: wrapped = {WEB_SHADOW_ROOT_KEY: args.id} elif ( isinstance(args, bool) or isinstance(args, six.string_types) or isinstance(args, int) or isinstance(args, float) or args is None ): wrapped = args return wrapped def _from_json(self, value): if isinstance(value, dict) and any( k in value.keys() for k in HTMLElement.identifiers ): return HTMLElement._from_json(value, self) elif isinstance(value, dict) and any( k in value.keys() for k in ShadowRoot.identifiers ): return ShadowRoot._from_json(value, self) elif isinstance(value, dict): return {key: self._from_json(val) for key, val in value.items()} elif isinstance(value, list): return list(self._from_json(item) for item in value) else: return value def execute_script( self, script, script_args=(), new_sandbox=True, sandbox="default", script_timeout=None, ): """Executes a synchronous JavaScript script, and returns the result (or None if the script does return a value). The script is executed in the context set by the most recent :func:`set_context` call, or to the CONTEXT_CONTENT context if :func:`set_context` has not been called. :param script: A string containing the JavaScript to execute. :param script_args: An interable of arguments to pass to the script. :param new_sandbox: If False, preserve global variables from the last execute_*script call. This is True by default, in which case no globals are preserved. :param sandbox: A tag referring to the sandbox you wish to use; if you specify a new tag, a new sandbox will be created. If you use the special tag `system`, the sandbox will be created using the system principal which has elevated privileges. :param script_timeout: Timeout in milliseconds, overriding the session's default script timeout. Simple usage example: :: result = marionette.execute_script("return 1;") assert result == 1 You can use the `script_args` parameter to pass arguments to the script: :: result = marionette.execute_script("return arguments[0] + arguments[1];", script_args=(2, 3,)) assert result == 5 some_element = marionette.find_element(By.ID, "someElement") sid = marionette.execute_script("return arguments[0].id;", script_args=(some_element,)) assert some_element.get_attribute("id") == sid Scripts wishing to access non-standard properties of the window object must use window.wrappedJSObject: :: result = marionette.execute_script(''' window.wrappedJSObject.test1 = "foo"; window.wrappedJSObject.test2 = "bar"; return window.wrappedJSObject.test1 + window.wrappedJSObject.test2; ''') assert result == "foobar" Global variables set by individual scripts do not persist between script calls by default. If you wish to persist data between script calls, you can set `new_sandbox` to False on your next call, and add any new variables to a new 'global' object like this: :: marionette.execute_script("global.test1 = 'foo';") result = self.marionette.execute_script("return global.test1;", new_sandbox=False) assert result == "foo" """ original_timeout = None if script_timeout is not None: original_timeout = self.timeout.script self.timeout.script = script_timeout / 1000.0 try: args = self._to_json(script_args) stack = traceback.extract_stack() frame = stack[-2:-1][0] # grab the second-to-last frame filename = ( frame[0] if sys.platform == "win32" else os.path.relpath(frame[0]) ) body = { "script": script.strip(), "args": args, "newSandbox": new_sandbox, "sandbox": sandbox, "line": int(frame[1]), "filename": filename, } rv = self._send_message("WebDriver:ExecuteScript", body, key="value") finally: if script_timeout is not None: self.timeout.script = original_timeout return rv def execute_async_script( self, script, script_args=(), new_sandbox=True, sandbox="default", script_timeout=None, ): """Executes an asynchronous JavaScript script, and returns the result (or None if the script does return a value). The script is executed in the context set by the most recent :func:`set_context` call, or to the CONTEXT_CONTENT context if :func:`set_context` has not been called. :param script: A string containing the JavaScript to execute. :param script_args: An interable of arguments to pass to the script. :param new_sandbox: If False, preserve global variables from the last execute_*script call. This is True by default, in which case no globals are preserved. :param sandbox: A tag referring to the sandbox you wish to use; if you specify a new tag, a new sandbox will be created. If you use the special tag `system`, the sandbox will be created using the system principal which has elevated privileges. :param script_timeout: Timeout in milliseconds, overriding the session's default script timeout. Usage example: :: marionette.timeout.script = 10 result = self.marionette.execute_async_script(''' // this script waits 5 seconds, and then returns the number 1 let [resolve] = arguments; setTimeout(function() { resolve(1); }, 5000); ''') assert result == 1 """ original_timeout = None if script_timeout is not None: original_timeout = self.timeout.script self.timeout.script = script_timeout / 1000.0 try: args = self._to_json(script_args) stack = traceback.extract_stack() frame = stack[-2:-1][0] # grab the second-to-last frame filename = ( frame[0] if sys.platform == "win32" else os.path.relpath(frame[0]) ) body = { "script": script.strip(), "args": args, "newSandbox": new_sandbox, "sandbox": sandbox, "scriptTimeout": script_timeout, "line": int(frame[1]), "filename": filename, } rv = self._send_message("WebDriver:ExecuteAsyncScript", body, key="value") finally: if script_timeout is not None: self.timeout.script = original_timeout return rv def find_element(self, method, target, id=None): """Returns an :class:`~marionette_driver.marionette.HTMLElement` instance that matches the specified method and target in the current context. An :class:`~marionette_driver.marionette.HTMLElement` instance may be used to call other methods on the element, such as :func:`~marionette_driver.marionette.HTMLElement.click`. If no element is immediately found, the attempt to locate an element will be repeated for up to the amount of time set by :attr:`marionette_driver.timeout.Timeouts.implicit`. If multiple elements match the given criteria, only the first is returned. If no element matches, a ``NoSuchElementException`` will be raised. :param method: The method to use to locate the element; one of: "id", "name", "class name", "tag name", "css selector", "link text", "partial link text" and "xpath". Note that the "name", "link text" and "partial link test" methods are not supported in the chrome DOM. :param target: The target of the search. For example, if method = "tag", target might equal "div". If method = "id", target would be an element id. :param id: If specified, search for elements only inside the element with the specified id. """ body = {"value": target, "using": method} if id: body["element"] = id return self._send_message("WebDriver:FindElement", body, key="value") def find_elements(self, method, target, id=None): """Returns a list of all :class:`~marionette_driver.marionette.HTMLElement` instances that match the specified method and target in the current context. An :class:`~marionette_driver.marionette.HTMLElement` instance may be used to call other methods on the element, such as :func:`~marionette_driver.marionette.HTMLElement.click`. If no element is immediately found, the attempt to locate an element will be repeated for up to the amount of time set by :attr:`marionette_driver.timeout.Timeouts.implicit`. :param method: The method to use to locate the elements; one of: "id", "name", "class name", "tag name", "css selector", "link text", "partial link text" and "xpath". Note that the "name", "link text" and "partial link test" methods are not supported in the chrome DOM. :param target: The target of the search. For example, if method = "tag", target might equal "div". If method = "id", target would be an element id. :param id: If specified, search for elements only inside the element with the specified id. """ body = {"value": target, "using": method} if id: body["element"] = id return self._send_message("WebDriver:FindElements", body) def get_active_element(self): el_or_ref = self._send_message("WebDriver:GetActiveElement", key="value") return el_or_ref def add_cookie(self, cookie): """Adds a cookie to your current session. :param cookie: A dictionary object, with required keys - "name" and "value"; optional keys - "path", "domain", "secure", "expiry". Usage example: :: driver.add_cookie({"name": "foo", "value": "bar"}) driver.add_cookie({"name": "foo", "value": "bar", "path": "/"}) driver.add_cookie({"name": "foo", "value": "bar", "path": "/", "secure": True}) """ self._send_message("WebDriver:AddCookie", {"cookie": cookie}) def delete_all_cookies(self): """Delete all cookies in the scope of the current session. Usage example: :: driver.delete_all_cookies() """ self._send_message("WebDriver:DeleteAllCookies") def delete_cookie(self, name): """Delete a cookie by its name. :param name: Name of cookie to delete. Usage example: :: driver.delete_cookie("foo") """ self._send_message("WebDriver:DeleteCookie", {"name": name}) def get_cookie(self, name): """Get a single cookie by name. Returns the cookie if found, None if not. :param name: Name of cookie to get. """ cookies = self.get_cookies() for cookie in cookies: if cookie["name"] == name: return cookie return None def get_cookies(self): """Get all the cookies for the current domain. This is the equivalent of calling `document.cookie` and parsing the result. :returns: A list of cookies for the current domain. """ return self._send_message("WebDriver:GetCookies") def save_screenshot(self, fh, element=None, full=True, scroll=True): """Takes a screenhot of a web element or the current frame and saves it in the filehandle. It is a wrapper around screenshot() :param fh: The filehandle to save the screenshot at. The rest of the parameters are defined like in screenshot() """ data = self.screenshot(element, "binary", full, scroll) fh.write(data) def screenshot(self, element=None, format="base64", full=True, scroll=True): """Takes a screenshot of a web element or the current frame. The screen capture is returned as a lossless PNG image encoded as a base 64 string by default. If the `element` argument is defined the capture area will be limited to the bounding box of that element. Otherwise, the capture area will be the bounding box of the current frame. :param element: The element to take a screenshot of. If None, will take a screenshot of the current frame. :param format: if "base64" (the default), returns the screenshot as a base64-string. If "binary", the data is decoded and returned as raw binary. If "hash", the data is hashed using the SHA-256 algorithm and the result is returned as a hex digest. :param full: If True (the default), the capture area will be the complete frame. Else only the viewport is captured. Only applies when `element` is None. :param scroll: When `element` is provided, scroll to it before taking the screenshot (default). Otherwise, avoid scrolling `element` into view. """ if element: element = element.id body = {"id": element, "full": full, "hash": False, "scroll": scroll} if format == "hash": body["hash"] = True data = self._send_message("WebDriver:TakeScreenshot", body, key="value") if format == "base64" or format == "hash": return data elif format == "binary": return base64.b64decode(data.encode("ascii")) else: raise ValueError( "format parameter must be either 'base64'" " or 'binary', not {0}".format(repr(format)) ) @property def orientation(self): """Get the current browser orientation. Will return one of the valid primary orientation values portrait-primary, landscape-primary, portrait-secondary, or landscape-secondary. """ try: return self._send_message("Marionette:GetScreenOrientation", key="value") except errors.UnknownCommandException: return self._send_message("getScreenOrientation", key="value") def set_orientation(self, orientation): """Set the current browser orientation. The supplied orientation should be given as one of the valid orientation values. If the orientation is unknown, an error will be raised. Valid orientations are "portrait" and "landscape", which fall back to "portrait-primary" and "landscape-primary" respectively, and "portrait-secondary" as well as "landscape-secondary". :param orientation: The orientation to lock the screen in. """ body = {"orientation": orientation} try: self._send_message("Marionette:SetScreenOrientation", body) except errors.UnknownCommandException: self._send_message("setScreenOrientation", body) def minimize_window(self): """Iconify the browser window currently receiving commands. The action should be equivalent to the user pressing the minimize button in the OS window. Note that this command is not available on Fennec. It may also not be available in certain window managers. :returns Window rect. """ return self._send_message("WebDriver:MinimizeWindow") def maximize_window(self): """Resize the browser window currently receiving commands. The action should be equivalent to the user pressing the maximize button in the OS window. Note that this command is not available on Fennec. It may also not be available in certain window managers. :returns: Window rect. """ return self._send_message("WebDriver:MaximizeWindow") def fullscreen(self): """Synchronously sets the user agent window to full screen as if the user had done "View > Enter Full Screen", or restores it if it is already in full screen. :returns: Window rect. """ return self._send_message("WebDriver:FullscreenWindow")