summaryrefslogtreecommitdiffstats
path: root/testing/webcompat/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/webcompat/client.py')
-rw-r--r--testing/webcompat/client.py759
1 files changed, 759 insertions, 0 deletions
diff --git a/testing/webcompat/client.py b/testing/webcompat/client.py
new file mode 100644
index 0000000000..88208e5507
--- /dev/null
+++ b/testing/webcompat/client.py
@@ -0,0 +1,759 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import asyncio
+import contextlib
+import time
+from urllib.parse import quote
+
+import webdriver
+from webdriver.bidi.modules.script import ContextTarget
+
+
+class Client:
+ def __init__(self, session, event_loop):
+ self.session = session
+ self.event_loop = event_loop
+ self.content_blocker_loaded = False
+
+ @property
+ def current_url(self):
+ return self.session.url
+
+ @property
+ def alert(self):
+ return self.session.alert
+
+ @property
+ def context(self):
+ return self.session.send_session_command("GET", "moz/context")
+
+ @context.setter
+ def context(self, context):
+ self.session.send_session_command("POST", "moz/context", {"context": context})
+
+ @contextlib.contextmanager
+ def using_context(self, context):
+ orig_context = self.context
+ needs_change = context != orig_context
+
+ if needs_change:
+ self.context = context
+
+ try:
+ yield
+ finally:
+ if needs_change:
+ self.context = orig_context
+
+ def wait_for_content_blocker(self):
+ if not self.content_blocker_loaded:
+ with self.using_context("chrome"):
+ self.session.execute_async_script(
+ """
+ const done = arguments[0],
+ signal = "safebrowsing-update-finished";
+ function finish() {
+ Services.obs.removeObserver(finish, signal);
+ done();
+ }
+ Services.obs.addObserver(finish, signal);
+ """
+ )
+ self.content_blocker_loaded = True
+
+ @property
+ def keyboard(self):
+ return self.session.actions.sequence("key", "keyboard_id")
+
+ @property
+ def mouse(self):
+ return self.session.actions.sequence(
+ "pointer", "pointer_id", {"pointerType": "mouse"}
+ )
+
+ @property
+ def pen(self):
+ return self.session.actions.sequence(
+ "pointer", "pointer_id", {"pointerType": "pen"}
+ )
+
+ @property
+ def touch(self):
+ return self.session.actions.sequence(
+ "pointer", "pointer_id", {"pointerType": "touch"}
+ )
+
+ @property
+ def wheel(self):
+ return self.session.actions.sequence("wheel", "wheel_id")
+
+ @property
+ def modifier_key(self):
+ if self.session.capabilities["platformName"] == "mac":
+ return "\ue03d" # meta (command)
+ else:
+ return "\ue009" # control
+
+ def inline(self, doc):
+ return "data:text/html;charset=utf-8,{}".format(quote(doc))
+
+ async def top_context(self):
+ contexts = await self.session.bidi_session.browsing_context.get_tree()
+ return contexts[0]
+
+ async def navigate(self, url, timeout=None, **kwargs):
+ return await asyncio.wait_for(
+ asyncio.ensure_future(self._navigate(url, **kwargs)), timeout=timeout
+ )
+
+ async def _navigate(self, url, wait="complete", await_console_message=None):
+ if self.session.test_config.get("use_pbm") or self.session.test_config.get(
+ "use_strict_etp"
+ ):
+ print("waiting for content blocker...")
+ self.wait_for_content_blocker()
+ if await_console_message is not None:
+ console_message = await self.promise_console_message_listener(
+ await_console_message
+ )
+ if wait == "load":
+ page_load = await self.promise_readystate_listener("load", url=url)
+ try:
+ await self.session.bidi_session.browsing_context.navigate(
+ context=(await self.top_context())["context"],
+ url=url,
+ wait=wait if wait != "load" else None,
+ )
+ except webdriver.bidi.error.UnknownErrorException as u:
+ m = str(u)
+ if (
+ "NS_BINDING_ABORTED" not in m
+ and "NS_ERROR_ABORT" not in m
+ and "NS_ERROR_WONT_HANDLE_CONTENT" not in m
+ ):
+ raise u
+ if wait == "load":
+ await page_load
+ if await_console_message is not None:
+ await console_message
+
+ async def promise_event_listener(self, events, check_fn=None, timeout=20):
+ if type(events) is not list:
+ events = [events]
+
+ await self.session.bidi_session.session.subscribe(events=events)
+
+ future = self.event_loop.create_future()
+
+ listener_removers = []
+
+ def remove_listeners():
+ for listener_remover in listener_removers:
+ try:
+ listener_remover()
+ except Exception:
+ pass
+
+ async def on_event(method, data):
+ print("on_event", method, data)
+ val = None
+ if check_fn is not None:
+ val = check_fn(method, data)
+ if val is None:
+ return
+ future.set_result(val)
+
+ for event in events:
+ r = self.session.bidi_session.add_event_listener(event, on_event)
+ listener_removers.append(r)
+
+ async def task():
+ try:
+ return await asyncio.wait_for(future, timeout=timeout)
+ finally:
+ remove_listeners()
+ try:
+ await asyncio.wait_for(
+ self.session.bidi_session.session.unsubscribe(events=events),
+ timeout=4,
+ )
+ except asyncio.exceptions.TimeoutError:
+ print("Unexpectedly timed out unsubscribing", events)
+ pass
+
+ return asyncio.create_task(task())
+
+ async def promise_console_message_listener(self, msg, **kwargs):
+ def check(method, data):
+ if "text" in data:
+ if msg in data["text"]:
+ return data
+ if "args" in data and len(data["args"]):
+ for arg in data["args"]:
+ if "value" in arg and msg in arg["value"]:
+ return data
+
+ return await self.promise_event_listener("log.entryAdded", check, **kwargs)
+
+ async def is_console_message(self, message):
+ try:
+ await (await self.promise_console_message_listener(message, timeout=2))
+ return True
+ except asyncio.exceptions.TimeoutError:
+ return False
+
+ async def promise_readystate_listener(self, state, url=None, **kwargs):
+ event = f"browsingContext.{state}"
+
+ def check(method, data):
+ if url is None or url in data["url"]:
+ return data
+
+ return await self.promise_event_listener(event, check, **kwargs)
+
+ async def promise_frame_listener(self, url, state="domContentLoaded", **kwargs):
+ event = f"browsingContext.{state}"
+
+ def check(method, data):
+ if url is None or url in data["url"]:
+ return Client.Context(self, data["context"])
+
+ return await self.promise_event_listener(event, check, **kwargs)
+
+ async def find_frame_context_by_url(self, url):
+ def find_in(arr, url):
+ for context in arr:
+ if url in context["url"]:
+ return context
+ for context in arr:
+ found = find_in(context["children"], url)
+ if found:
+ return found
+
+ return find_in([await self.top_context()], url)
+
+ class Context:
+ def __init__(self, client, id):
+ self.client = client
+ self.target = ContextTarget(id)
+
+ async def find_css(self, selector, all=False):
+ all = "All" if all else ""
+ return await self.client.session.bidi_session.script.evaluate(
+ expression=f"document.querySelector{all}('{selector}')",
+ target=self.target,
+ await_promise=False,
+ )
+
+ def timed_js(self, timeout, poll, fn, is_displayed=False):
+ return f"""() => new Promise((_good, _bad) => {{
+ {self.is_displayed_js()}
+ var _poll = {poll} * 1000;
+ var _time = {timeout} * 1000;
+ var _done = false;
+ var resolve = val => {{
+ if ({is_displayed}) {{
+ if (val.length) {{
+ val = val.filter(v = is_displayed(v));
+ }} else {{
+ val = is_displayed(val) && val;
+ }}
+ if (!val.length && !val.matches) {{
+ return;
+ }}
+ }}
+ _done = true;
+ clearInterval(_int);
+ _good(val);
+ }};
+ var reject = str => {{
+ _done = true;
+ clearInterval(_int);
+ _bad(val);
+ }};
+ var _int = setInterval(() => {{
+ {fn};
+ if (!_done) {{
+ _time -= _poll;
+ if (_time <= 0) {{
+ reject();
+ }}
+ }}
+ }}, poll);
+ }})"""
+
+ def is_displayed_js(self):
+ return """
+ function is_displayed(e) {
+ const s = window.getComputedStyle(e),
+ v = s.visibility === "visible",
+ o = Math.abs(parseFloat(s.opacity));
+ return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0);
+ }
+ """
+
+ async def await_css(
+ self,
+ selector,
+ all=False,
+ timeout=10,
+ poll=0.25,
+ condition=False,
+ is_displayed=False,
+ ):
+ all = "All" if all else ""
+ condition = (
+ f"var elem=arguments[0]; if ({condition})" if condition else False
+ )
+ return await self.client.session.bidi_session.script.evaluate(
+ expression=self.timed_js(
+ timeout,
+ poll,
+ f"""
+ var ele = document.querySelector{all}('{selector}')";
+ if (ele && (!"length" in ele || ele.length > 0)) {{
+ '{condition}'
+ resolve(ele);
+ }}
+ """,
+ ),
+ target=self.target,
+ await_promise=True,
+ )
+
+ async def await_text(self, text, **kwargs):
+ xpath = f"//*[contains(text(),'{text}')]"
+ return await self.await_xpath(self, xpath, **kwargs)
+
+ async def await_xpath(
+ self, xpath, all=False, timeout=10, poll=0.25, is_displayed=False
+ ):
+ all = "true" if all else "false"
+ return await self.client.session.bidi_session.script.evaluate(
+ expression=self.timed_js(
+ timeout,
+ poll,
+ """
+ var ret = [];
+ var r, res = document.evaluate(`{xpath}`, document, null, 4);
+ while (r = res.iterateNext()) {
+ ret.push(r);
+ }
+ resolve({all} ? ret : ret[0]);
+ """,
+ ),
+ target=self.target,
+ await_promise=True,
+ )
+
+ def wrap_script_args(self, args):
+ if args is None:
+ return args
+ out = []
+ for arg in args:
+ if arg is None:
+ out.append({"type": "undefined"})
+ continue
+ t = type(arg)
+ if t == int or t == float:
+ out.append({"type": "number", "value": arg})
+ elif t == bool:
+ out.append({"type": "boolean", "value": arg})
+ elif t == str:
+ out.append({"type": "string", "value": arg})
+ else:
+ if "type" in arg:
+ out.push(arg)
+ continue
+ raise ValueError(f"Unhandled argument type: {t}")
+ return out
+
+ class PreloadScript:
+ def __init__(self, client, script, target):
+ self.client = client
+ self.script = script
+ if type(target) == list:
+ self.target = target[0]
+ else:
+ self.target = target
+
+ def stop(self):
+ return self.client.session.bidi_session.script.remove_preload_script(
+ script=self.script
+ )
+
+ async def run(self, fn, *args, await_promise=False):
+ val = await self.client.session.bidi_session.script.call_function(
+ arguments=self.client.wrap_script_args(args),
+ await_promise=await_promise,
+ function_declaration=fn,
+ target=self.target,
+ )
+ if val and "value" in val:
+ return val["value"]
+ return val
+
+ async def make_preload_script(self, text, sandbox, args=None, context=None):
+ if not context:
+ context = (await self.top_context())["context"]
+ target = ContextTarget(context, sandbox)
+ if args is None:
+ text = f"() => {{ {text} }}"
+ script = await self.session.bidi_session.script.add_preload_script(
+ function_declaration=text,
+ arguments=self.wrap_script_args(args),
+ sandbox=sandbox,
+ )
+ return Client.PreloadScript(self, script, target)
+
+ async def await_alert(self, text):
+ if not hasattr(self, "alert_preload_script"):
+ self.alert_preload_script = await self.make_preload_script(
+ """
+ window.__alerts = [];
+ window.wrappedJSObject.alert = function(text) {
+ window.__alerts.push(text);
+ }
+ """,
+ "alert_detector",
+ )
+ return self.alert_preload_script.run(
+ """(msg) => new Promise(done => {
+ const to = setInterval(() => {
+ if (window.__alerts.includes(msg)) {
+ clearInterval(to);
+ done();
+ }
+ }, 200);
+ })
+ """,
+ text,
+ await_promise=True,
+ )
+
+ async def await_popup(self, url=None):
+ if not hasattr(self, "popup_preload_script"):
+ self.popup_preload_script = await self.make_preload_script(
+ """
+ window.__popups = [];
+ window.wrappedJSObject.open = function(url) {
+ window.__popups.push(url);
+ }
+ """,
+ "popup_detector",
+ )
+ return self.popup_preload_script.run(
+ """(url) => new Promise(done => {
+ const to = setInterval(() => {
+ if (url === undefined && window.__popups.length) {
+ clearInterval(to);
+ return done(window.__popups[0]);
+ }
+ const found = window.__popups.find(u => u.includes(url));
+ if (found !== undefined) {
+ clearInterval(to);
+ done(found);
+ }
+ }, 1000);
+ })
+ """,
+ url,
+ await_promise=True,
+ )
+
+ async def track_listener(self, type, selector):
+ if not hasattr(self, "listener_preload_script"):
+ self.listener_preload_script = await self.make_preload_script(
+ """
+ window.__listeners = {};
+ var proto = EventTarget.wrappedJSObject.prototype;
+ var def = Object.getOwnPropertyDescriptor(proto, "addEventListener");
+ var old = def.value;
+ def.value = function(type, fn, opts) {
+ if ("matches" in this) {
+ if (!window.__listeners[type]) {
+ window.__listeners[type] = new Set();
+ }
+ window.__listeners[type].add(this);
+ }
+ return old.call(this, type, fn, opts)
+ };
+ Object.defineProperty(proto, "addEventListener", def);
+ """,
+ "listener_detector",
+ )
+ return Client.ListenerTracker(self.listener_preload_script, type, selector)
+
+ @contextlib.asynccontextmanager
+ async def preload_script(self, text, *args):
+ script = await self.make_preload_script(text, "preload", args=args)
+ yield script
+ await script.stop()
+
+ def back(self):
+ self.session.back()
+
+ def switch_to_frame(self, frame):
+ return self.session.transport.send(
+ "POST",
+ "session/{session_id}/frame".format(**vars(self.session)),
+ {"id": frame},
+ encoder=webdriver.protocol.Encoder,
+ decoder=webdriver.protocol.Decoder,
+ session=self.session,
+ )
+
+ def switch_frame(self, frame):
+ self.session.switch_frame(frame)
+
+ async def load_page_and_wait_for_iframe(
+ self, url, finder, loads=1, timeout=None, **kwargs
+ ):
+ while loads > 0:
+ await self.navigate(url, **kwargs)
+ frame = self.await_element(finder, timeout=timeout)
+ loads -= 1
+ self.switch_frame(frame)
+ return frame
+
+ def execute_script(self, script, *args):
+ return self.session.execute_script(script, args=args)
+
+ def execute_async_script(self, script, *args, **kwargs):
+ return self.session.execute_async_script(script, args, **kwargs)
+
+ def clear_all_cookies(self):
+ self.session.transport.send(
+ "DELETE", "session/%s/cookie" % self.session.session_id
+ )
+
+ def send_element_command(self, element, method, uri, body=None):
+ url = "element/%s/%s" % (element.id, uri)
+ return self.session.send_session_command(method, url, body)
+
+ def get_element_attribute(self, element, name):
+ return self.send_element_command(element, "GET", "attribute/%s" % name)
+
+ def _do_is_displayed_check(self, ele, is_displayed):
+ if ele is None:
+ return None
+
+ if type(ele) in [list, tuple]:
+ return [x for x in ele if self._do_is_displayed_check(x, is_displayed)]
+
+ if is_displayed is False and ele and self.is_displayed(ele):
+ return None
+ if is_displayed is True and ele and not self.is_displayed(ele):
+ return None
+ return ele
+
+ def find_css(self, *args, all=False, is_displayed=None, **kwargs):
+ try:
+ ele = self.session.find.css(*args, all=all, **kwargs)
+ return self._do_is_displayed_check(ele, is_displayed)
+ except webdriver.error.NoSuchElementException:
+ return None
+
+ def find_xpath(self, xpath, all=False, is_displayed=None):
+ route = "elements" if all else "element"
+ body = {"using": "xpath", "value": xpath}
+ try:
+ ele = self.session.send_session_command("POST", route, body)
+ return self._do_is_displayed_check(ele, is_displayed)
+ except webdriver.error.NoSuchElementException:
+ return None
+
+ def find_text(self, text, is_displayed=None, **kwargs):
+ try:
+ ele = self.find_xpath(f"//*[contains(text(),'{text}')]", **kwargs)
+ return self._do_is_displayed_check(ele, is_displayed)
+ except webdriver.error.NoSuchElementException:
+ return None
+
+ def find_element(self, finder, is_displayed=None, **kwargs):
+ ele = finder.find(self, **kwargs)
+ return self._do_is_displayed_check(ele, is_displayed)
+
+ def await_css(self, selector, **kwargs):
+ return self.await_element(self.css(selector), **kwargs)
+
+ def await_xpath(self, selector, **kwargs):
+ return self.await_element(self.xpath(selector), **kwargs)
+
+ def await_text(self, selector, *args, **kwargs):
+ return self.await_element(self.text(selector), **kwargs)
+
+ def await_element(self, finder, **kwargs):
+ return self.await_first_element_of([finder], **kwargs)[0]
+
+ class css:
+ def __init__(self, selector):
+ self.selector = selector
+
+ def find(self, client, **kwargs):
+ return client.find_css(self.selector, **kwargs)
+
+ class xpath:
+ def __init__(self, selector):
+ self.selector = selector
+
+ def find(self, client, **kwargs):
+ return client.find_xpath(self.selector, **kwargs)
+
+ class text:
+ def __init__(self, selector):
+ self.selector = selector
+
+ def find(self, client, **kwargs):
+ return client.find_text(self.selector, **kwargs)
+
+ def await_first_element_of(
+ self, finders, timeout=None, delay=0.25, condition=False, **kwargs
+ ):
+ t0 = time.time()
+ condition = f"var elem=arguments[0]; return {condition}" if condition else False
+
+ if timeout is None:
+ timeout = 10
+
+ found = [None for finder in finders]
+
+ exc = None
+ while time.time() < t0 + timeout:
+ for i, finder in enumerate(finders):
+ try:
+ result = finder.find(self, **kwargs)
+ if result and (
+ not condition
+ or self.session.execute_script(condition, [result])
+ ):
+ found[i] = result
+ return found
+ except webdriver.error.NoSuchElementException as e:
+ exc = e
+ time.sleep(delay)
+ raise exc if exc is not None else webdriver.error.NoSuchElementException
+ return found
+
+ async def dom_ready(self, timeout=None):
+ if timeout is None:
+ timeout = 20
+
+ async def wait():
+ return self.session.execute_async_script(
+ """
+ const cb = arguments[0];
+ setInterval(() => {
+ if (document.readyState === "complete") {
+ cb();
+ }
+ }, 500);
+ """
+ )
+
+ task = asyncio.create_task(wait())
+ return await asyncio.wait_for(task, timeout)
+
+ def is_float_cleared(self, elem1, elem2):
+ return self.session.execute_script(
+ """return (function(a, b) {
+ // Ensure that a is placed under b (and not to its right)
+ return a?.offsetTop >= b?.offsetTop + b?.offsetHeight &&
+ a?.offsetLeft < b?.offsetLeft + b?.offsetWidth;
+ }(arguments[0], arguments[1]));""",
+ elem1,
+ elem2,
+ )
+
+ @contextlib.contextmanager
+ def assert_getUserMedia_called(self):
+ self.execute_script(
+ """
+ navigator.mediaDevices.getUserMedia =
+ navigator.mozGetUserMedia =
+ navigator.getUserMedia =
+ () => { window.__gumCalled = true; };
+ """
+ )
+ yield
+ assert self.execute_script("return window.__gumCalled === true;")
+
+ def await_element_hidden(self, finder, timeout=None, delay=0.25):
+ t0 = time.time()
+
+ if timeout is None:
+ timeout = 20
+
+ elem = finder.find(self)
+ while time.time() < t0 + timeout:
+ try:
+ if not self.is_displayed(elem):
+ return
+ time.sleep(delay)
+ except webdriver.error.StaleElementReferenceException:
+ return
+
+ def soft_click(self, element):
+ self.execute_script("arguments[0].click()", element)
+
+ def remove_element(self, element):
+ self.execute_script("arguments[0].remove()", element)
+
+ def scroll_into_view(self, element):
+ self.execute_script(
+ "arguments[0].scrollIntoView({block:'center', inline:'center', behavior: 'instant'})",
+ element,
+ )
+
+ @contextlib.asynccontextmanager
+ async def ensure_fastclick_activates(self):
+ fastclick_preload_script = await self.make_preload_script(
+ """
+ var _ = document.createElement("webcompat_test");
+ _.style = "position:absolute;right:-1px;width:1px;height:1px";
+ document.documentElement.appendChild(_);
+ """,
+ "fastclick_forcer",
+ )
+ yield
+ fastclick_preload_script.stop()
+
+ def test_for_fastclick(self, element):
+ # FastClick cancels touchend, breaking default actions on Fenix.
+ # It instead fires a mousedown or click, which we can detect.
+ self.execute_script(
+ """
+ const sel = arguments[0];
+ window.fastclicked = false;
+ const evt = sel.nodeName === "SELECT" ? "mousedown" : "click";
+ document.addEventListener(evt, e => {
+ if (e.target === sel && !e.isTrusted) {
+ window.fastclicked = true;
+ }
+ }, true);
+ """,
+ element,
+ )
+ self.scroll_into_view(element)
+ # tap a few times in case the site's other code interferes
+ self.touch.click(element=element).perform()
+ self.touch.click(element=element).perform()
+ self.touch.click(element=element).perform()
+ return self.execute_script("return window.fastclicked")
+
+ def is_displayed(self, element):
+ if element is None:
+ return False
+
+ return self.session.execute_script(
+ """
+ const e = arguments[0],
+ s = window.getComputedStyle(e),
+ v = s.visibility === "visible",
+ o = Math.abs(parseFloat(s.opacity));
+ return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0);
+ """,
+ args=[element],
+ )