diff options
Diffstat (limited to 'testing/webcompat/client.py')
-rw-r--r-- | testing/webcompat/client.py | 759 |
1 files changed, 759 insertions, 0 deletions
diff --git a/testing/webcompat/client.py b/testing/webcompat/client.py new file mode 100644 index 0000000000..88208e5507 --- /dev/null +++ b/testing/webcompat/client.py @@ -0,0 +1,759 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import asyncio +import contextlib +import time +from urllib.parse import quote + +import webdriver +from webdriver.bidi.modules.script import ContextTarget + + +class Client: + def __init__(self, session, event_loop): + self.session = session + self.event_loop = event_loop + self.content_blocker_loaded = False + + @property + def current_url(self): + return self.session.url + + @property + def alert(self): + return self.session.alert + + @property + def context(self): + return self.session.send_session_command("GET", "moz/context") + + @context.setter + def context(self, context): + self.session.send_session_command("POST", "moz/context", {"context": context}) + + @contextlib.contextmanager + def using_context(self, context): + orig_context = self.context + needs_change = context != orig_context + + if needs_change: + self.context = context + + try: + yield + finally: + if needs_change: + self.context = orig_context + + def wait_for_content_blocker(self): + if not self.content_blocker_loaded: + with self.using_context("chrome"): + self.session.execute_async_script( + """ + const done = arguments[0], + signal = "safebrowsing-update-finished"; + function finish() { + Services.obs.removeObserver(finish, signal); + done(); + } + Services.obs.addObserver(finish, signal); + """ + ) + self.content_blocker_loaded = True + + @property + def keyboard(self): + return self.session.actions.sequence("key", "keyboard_id") + + @property + def mouse(self): + return self.session.actions.sequence( + "pointer", "pointer_id", {"pointerType": "mouse"} + ) + + @property + def pen(self): + return self.session.actions.sequence( + "pointer", "pointer_id", {"pointerType": "pen"} + ) + + @property + def touch(self): + return self.session.actions.sequence( + "pointer", "pointer_id", {"pointerType": "touch"} + ) + + @property + def wheel(self): + return self.session.actions.sequence("wheel", "wheel_id") + + @property + def modifier_key(self): + if self.session.capabilities["platformName"] == "mac": + return "\ue03d" # meta (command) + else: + return "\ue009" # control + + def inline(self, doc): + return "data:text/html;charset=utf-8,{}".format(quote(doc)) + + async def top_context(self): + contexts = await self.session.bidi_session.browsing_context.get_tree() + return contexts[0] + + async def navigate(self, url, timeout=None, **kwargs): + return await asyncio.wait_for( + asyncio.ensure_future(self._navigate(url, **kwargs)), timeout=timeout + ) + + async def _navigate(self, url, wait="complete", await_console_message=None): + if self.session.test_config.get("use_pbm") or self.session.test_config.get( + "use_strict_etp" + ): + print("waiting for content blocker...") + self.wait_for_content_blocker() + if await_console_message is not None: + console_message = await self.promise_console_message_listener( + await_console_message + ) + if wait == "load": + page_load = await self.promise_readystate_listener("load", url=url) + try: + await self.session.bidi_session.browsing_context.navigate( + context=(await self.top_context())["context"], + url=url, + wait=wait if wait != "load" else None, + ) + except webdriver.bidi.error.UnknownErrorException as u: + m = str(u) + if ( + "NS_BINDING_ABORTED" not in m + and "NS_ERROR_ABORT" not in m + and "NS_ERROR_WONT_HANDLE_CONTENT" not in m + ): + raise u + if wait == "load": + await page_load + if await_console_message is not None: + await console_message + + async def promise_event_listener(self, events, check_fn=None, timeout=20): + if type(events) is not list: + events = [events] + + await self.session.bidi_session.session.subscribe(events=events) + + future = self.event_loop.create_future() + + listener_removers = [] + + def remove_listeners(): + for listener_remover in listener_removers: + try: + listener_remover() + except Exception: + pass + + async def on_event(method, data): + print("on_event", method, data) + val = None + if check_fn is not None: + val = check_fn(method, data) + if val is None: + return + future.set_result(val) + + for event in events: + r = self.session.bidi_session.add_event_listener(event, on_event) + listener_removers.append(r) + + async def task(): + try: + return await asyncio.wait_for(future, timeout=timeout) + finally: + remove_listeners() + try: + await asyncio.wait_for( + self.session.bidi_session.session.unsubscribe(events=events), + timeout=4, + ) + except asyncio.exceptions.TimeoutError: + print("Unexpectedly timed out unsubscribing", events) + pass + + return asyncio.create_task(task()) + + async def promise_console_message_listener(self, msg, **kwargs): + def check(method, data): + if "text" in data: + if msg in data["text"]: + return data + if "args" in data and len(data["args"]): + for arg in data["args"]: + if "value" in arg and msg in arg["value"]: + return data + + return await self.promise_event_listener("log.entryAdded", check, **kwargs) + + async def is_console_message(self, message): + try: + await (await self.promise_console_message_listener(message, timeout=2)) + return True + except asyncio.exceptions.TimeoutError: + return False + + async def promise_readystate_listener(self, state, url=None, **kwargs): + event = f"browsingContext.{state}" + + def check(method, data): + if url is None or url in data["url"]: + return data + + return await self.promise_event_listener(event, check, **kwargs) + + async def promise_frame_listener(self, url, state="domContentLoaded", **kwargs): + event = f"browsingContext.{state}" + + def check(method, data): + if url is None or url in data["url"]: + return Client.Context(self, data["context"]) + + return await self.promise_event_listener(event, check, **kwargs) + + async def find_frame_context_by_url(self, url): + def find_in(arr, url): + for context in arr: + if url in context["url"]: + return context + for context in arr: + found = find_in(context["children"], url) + if found: + return found + + return find_in([await self.top_context()], url) + + class Context: + def __init__(self, client, id): + self.client = client + self.target = ContextTarget(id) + + async def find_css(self, selector, all=False): + all = "All" if all else "" + return await self.client.session.bidi_session.script.evaluate( + expression=f"document.querySelector{all}('{selector}')", + target=self.target, + await_promise=False, + ) + + def timed_js(self, timeout, poll, fn, is_displayed=False): + return f"""() => new Promise((_good, _bad) => {{ + {self.is_displayed_js()} + var _poll = {poll} * 1000; + var _time = {timeout} * 1000; + var _done = false; + var resolve = val => {{ + if ({is_displayed}) {{ + if (val.length) {{ + val = val.filter(v = is_displayed(v)); + }} else {{ + val = is_displayed(val) && val; + }} + if (!val.length && !val.matches) {{ + return; + }} + }} + _done = true; + clearInterval(_int); + _good(val); + }}; + var reject = str => {{ + _done = true; + clearInterval(_int); + _bad(val); + }}; + var _int = setInterval(() => {{ + {fn}; + if (!_done) {{ + _time -= _poll; + if (_time <= 0) {{ + reject(); + }} + }} + }}, poll); + }})""" + + def is_displayed_js(self): + return """ + function is_displayed(e) { + const s = window.getComputedStyle(e), + v = s.visibility === "visible", + o = Math.abs(parseFloat(s.opacity)); + return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); + } + """ + + async def await_css( + self, + selector, + all=False, + timeout=10, + poll=0.25, + condition=False, + is_displayed=False, + ): + all = "All" if all else "" + condition = ( + f"var elem=arguments[0]; if ({condition})" if condition else False + ) + return await self.client.session.bidi_session.script.evaluate( + expression=self.timed_js( + timeout, + poll, + f""" + var ele = document.querySelector{all}('{selector}')"; + if (ele && (!"length" in ele || ele.length > 0)) {{ + '{condition}' + resolve(ele); + }} + """, + ), + target=self.target, + await_promise=True, + ) + + async def await_text(self, text, **kwargs): + xpath = f"//*[contains(text(),'{text}')]" + return await self.await_xpath(self, xpath, **kwargs) + + async def await_xpath( + self, xpath, all=False, timeout=10, poll=0.25, is_displayed=False + ): + all = "true" if all else "false" + return await self.client.session.bidi_session.script.evaluate( + expression=self.timed_js( + timeout, + poll, + """ + var ret = []; + var r, res = document.evaluate(`{xpath}`, document, null, 4); + while (r = res.iterateNext()) { + ret.push(r); + } + resolve({all} ? ret : ret[0]); + """, + ), + target=self.target, + await_promise=True, + ) + + def wrap_script_args(self, args): + if args is None: + return args + out = [] + for arg in args: + if arg is None: + out.append({"type": "undefined"}) + continue + t = type(arg) + if t == int or t == float: + out.append({"type": "number", "value": arg}) + elif t == bool: + out.append({"type": "boolean", "value": arg}) + elif t == str: + out.append({"type": "string", "value": arg}) + else: + if "type" in arg: + out.push(arg) + continue + raise ValueError(f"Unhandled argument type: {t}") + return out + + class PreloadScript: + def __init__(self, client, script, target): + self.client = client + self.script = script + if type(target) == list: + self.target = target[0] + else: + self.target = target + + def stop(self): + return self.client.session.bidi_session.script.remove_preload_script( + script=self.script + ) + + async def run(self, fn, *args, await_promise=False): + val = await self.client.session.bidi_session.script.call_function( + arguments=self.client.wrap_script_args(args), + await_promise=await_promise, + function_declaration=fn, + target=self.target, + ) + if val and "value" in val: + return val["value"] + return val + + async def make_preload_script(self, text, sandbox, args=None, context=None): + if not context: + context = (await self.top_context())["context"] + target = ContextTarget(context, sandbox) + if args is None: + text = f"() => {{ {text} }}" + script = await self.session.bidi_session.script.add_preload_script( + function_declaration=text, + arguments=self.wrap_script_args(args), + sandbox=sandbox, + ) + return Client.PreloadScript(self, script, target) + + async def await_alert(self, text): + if not hasattr(self, "alert_preload_script"): + self.alert_preload_script = await self.make_preload_script( + """ + window.__alerts = []; + window.wrappedJSObject.alert = function(text) { + window.__alerts.push(text); + } + """, + "alert_detector", + ) + return self.alert_preload_script.run( + """(msg) => new Promise(done => { + const to = setInterval(() => { + if (window.__alerts.includes(msg)) { + clearInterval(to); + done(); + } + }, 200); + }) + """, + text, + await_promise=True, + ) + + async def await_popup(self, url=None): + if not hasattr(self, "popup_preload_script"): + self.popup_preload_script = await self.make_preload_script( + """ + window.__popups = []; + window.wrappedJSObject.open = function(url) { + window.__popups.push(url); + } + """, + "popup_detector", + ) + return self.popup_preload_script.run( + """(url) => new Promise(done => { + const to = setInterval(() => { + if (url === undefined && window.__popups.length) { + clearInterval(to); + return done(window.__popups[0]); + } + const found = window.__popups.find(u => u.includes(url)); + if (found !== undefined) { + clearInterval(to); + done(found); + } + }, 1000); + }) + """, + url, + await_promise=True, + ) + + async def track_listener(self, type, selector): + if not hasattr(self, "listener_preload_script"): + self.listener_preload_script = await self.make_preload_script( + """ + window.__listeners = {}; + var proto = EventTarget.wrappedJSObject.prototype; + var def = Object.getOwnPropertyDescriptor(proto, "addEventListener"); + var old = def.value; + def.value = function(type, fn, opts) { + if ("matches" in this) { + if (!window.__listeners[type]) { + window.__listeners[type] = new Set(); + } + window.__listeners[type].add(this); + } + return old.call(this, type, fn, opts) + }; + Object.defineProperty(proto, "addEventListener", def); + """, + "listener_detector", + ) + return Client.ListenerTracker(self.listener_preload_script, type, selector) + + @contextlib.asynccontextmanager + async def preload_script(self, text, *args): + script = await self.make_preload_script(text, "preload", args=args) + yield script + await script.stop() + + def back(self): + self.session.back() + + def switch_to_frame(self, frame): + return self.session.transport.send( + "POST", + "session/{session_id}/frame".format(**vars(self.session)), + {"id": frame}, + encoder=webdriver.protocol.Encoder, + decoder=webdriver.protocol.Decoder, + session=self.session, + ) + + def switch_frame(self, frame): + self.session.switch_frame(frame) + + async def load_page_and_wait_for_iframe( + self, url, finder, loads=1, timeout=None, **kwargs + ): + while loads > 0: + await self.navigate(url, **kwargs) + frame = self.await_element(finder, timeout=timeout) + loads -= 1 + self.switch_frame(frame) + return frame + + def execute_script(self, script, *args): + return self.session.execute_script(script, args=args) + + def execute_async_script(self, script, *args, **kwargs): + return self.session.execute_async_script(script, args, **kwargs) + + def clear_all_cookies(self): + self.session.transport.send( + "DELETE", "session/%s/cookie" % self.session.session_id + ) + + def send_element_command(self, element, method, uri, body=None): + url = "element/%s/%s" % (element.id, uri) + return self.session.send_session_command(method, url, body) + + def get_element_attribute(self, element, name): + return self.send_element_command(element, "GET", "attribute/%s" % name) + + def _do_is_displayed_check(self, ele, is_displayed): + if ele is None: + return None + + if type(ele) in [list, tuple]: + return [x for x in ele if self._do_is_displayed_check(x, is_displayed)] + + if is_displayed is False and ele and self.is_displayed(ele): + return None + if is_displayed is True and ele and not self.is_displayed(ele): + return None + return ele + + def find_css(self, *args, all=False, is_displayed=None, **kwargs): + try: + ele = self.session.find.css(*args, all=all, **kwargs) + return self._do_is_displayed_check(ele, is_displayed) + except webdriver.error.NoSuchElementException: + return None + + def find_xpath(self, xpath, all=False, is_displayed=None): + route = "elements" if all else "element" + body = {"using": "xpath", "value": xpath} + try: + ele = self.session.send_session_command("POST", route, body) + return self._do_is_displayed_check(ele, is_displayed) + except webdriver.error.NoSuchElementException: + return None + + def find_text(self, text, is_displayed=None, **kwargs): + try: + ele = self.find_xpath(f"//*[contains(text(),'{text}')]", **kwargs) + return self._do_is_displayed_check(ele, is_displayed) + except webdriver.error.NoSuchElementException: + return None + + def find_element(self, finder, is_displayed=None, **kwargs): + ele = finder.find(self, **kwargs) + return self._do_is_displayed_check(ele, is_displayed) + + def await_css(self, selector, **kwargs): + return self.await_element(self.css(selector), **kwargs) + + def await_xpath(self, selector, **kwargs): + return self.await_element(self.xpath(selector), **kwargs) + + def await_text(self, selector, *args, **kwargs): + return self.await_element(self.text(selector), **kwargs) + + def await_element(self, finder, **kwargs): + return self.await_first_element_of([finder], **kwargs)[0] + + class css: + def __init__(self, selector): + self.selector = selector + + def find(self, client, **kwargs): + return client.find_css(self.selector, **kwargs) + + class xpath: + def __init__(self, selector): + self.selector = selector + + def find(self, client, **kwargs): + return client.find_xpath(self.selector, **kwargs) + + class text: + def __init__(self, selector): + self.selector = selector + + def find(self, client, **kwargs): + return client.find_text(self.selector, **kwargs) + + def await_first_element_of( + self, finders, timeout=None, delay=0.25, condition=False, **kwargs + ): + t0 = time.time() + condition = f"var elem=arguments[0]; return {condition}" if condition else False + + if timeout is None: + timeout = 10 + + found = [None for finder in finders] + + exc = None + while time.time() < t0 + timeout: + for i, finder in enumerate(finders): + try: + result = finder.find(self, **kwargs) + if result and ( + not condition + or self.session.execute_script(condition, [result]) + ): + found[i] = result + return found + except webdriver.error.NoSuchElementException as e: + exc = e + time.sleep(delay) + raise exc if exc is not None else webdriver.error.NoSuchElementException + return found + + async def dom_ready(self, timeout=None): + if timeout is None: + timeout = 20 + + async def wait(): + return self.session.execute_async_script( + """ + const cb = arguments[0]; + setInterval(() => { + if (document.readyState === "complete") { + cb(); + } + }, 500); + """ + ) + + task = asyncio.create_task(wait()) + return await asyncio.wait_for(task, timeout) + + def is_float_cleared(self, elem1, elem2): + return self.session.execute_script( + """return (function(a, b) { + // Ensure that a is placed under b (and not to its right) + return a?.offsetTop >= b?.offsetTop + b?.offsetHeight && + a?.offsetLeft < b?.offsetLeft + b?.offsetWidth; + }(arguments[0], arguments[1]));""", + elem1, + elem2, + ) + + @contextlib.contextmanager + def assert_getUserMedia_called(self): + self.execute_script( + """ + navigator.mediaDevices.getUserMedia = + navigator.mozGetUserMedia = + navigator.getUserMedia = + () => { window.__gumCalled = true; }; + """ + ) + yield + assert self.execute_script("return window.__gumCalled === true;") + + def await_element_hidden(self, finder, timeout=None, delay=0.25): + t0 = time.time() + + if timeout is None: + timeout = 20 + + elem = finder.find(self) + while time.time() < t0 + timeout: + try: + if not self.is_displayed(elem): + return + time.sleep(delay) + except webdriver.error.StaleElementReferenceException: + return + + def soft_click(self, element): + self.execute_script("arguments[0].click()", element) + + def remove_element(self, element): + self.execute_script("arguments[0].remove()", element) + + def scroll_into_view(self, element): + self.execute_script( + "arguments[0].scrollIntoView({block:'center', inline:'center', behavior: 'instant'})", + element, + ) + + @contextlib.asynccontextmanager + async def ensure_fastclick_activates(self): + fastclick_preload_script = await self.make_preload_script( + """ + var _ = document.createElement("webcompat_test"); + _.style = "position:absolute;right:-1px;width:1px;height:1px"; + document.documentElement.appendChild(_); + """, + "fastclick_forcer", + ) + yield + fastclick_preload_script.stop() + + def test_for_fastclick(self, element): + # FastClick cancels touchend, breaking default actions on Fenix. + # It instead fires a mousedown or click, which we can detect. + self.execute_script( + """ + const sel = arguments[0]; + window.fastclicked = false; + const evt = sel.nodeName === "SELECT" ? "mousedown" : "click"; + document.addEventListener(evt, e => { + if (e.target === sel && !e.isTrusted) { + window.fastclicked = true; + } + }, true); + """, + element, + ) + self.scroll_into_view(element) + # tap a few times in case the site's other code interferes + self.touch.click(element=element).perform() + self.touch.click(element=element).perform() + self.touch.click(element=element).perform() + return self.execute_script("return window.fastclicked") + + def is_displayed(self, element): + if element is None: + return False + + return self.session.execute_script( + """ + const e = arguments[0], + s = window.getComputedStyle(e), + v = s.visibility === "visible", + o = Math.abs(parseFloat(s.opacity)); + return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); + """, + args=[element], + ) |