# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import contextlib import time from urllib.parse import quote import webdriver from webdriver.bidi.modules.script import ContextTarget class Client: def __init__(self, session, event_loop): self.session = session self.event_loop = event_loop self.content_blocker_loaded = False @property def current_url(self): return self.session.url @property def alert(self): return self.session.alert @property def context(self): return self.session.send_session_command("GET", "moz/context") @context.setter def context(self, context): self.session.send_session_command("POST", "moz/context", {"context": context}) @contextlib.contextmanager def using_context(self, context): orig_context = self.context needs_change = context != orig_context if needs_change: self.context = context try: yield finally: if needs_change: self.context = orig_context def wait_for_content_blocker(self): if not self.content_blocker_loaded: with self.using_context("chrome"): self.session.execute_async_script( """ const done = arguments[0], signal = "safebrowsing-update-finished"; function finish() { Services.obs.removeObserver(finish, signal); done(); } Services.obs.addObserver(finish, signal); """ ) self.content_blocker_loaded = True @property def keyboard(self): return self.session.actions.sequence("key", "keyboard_id") @property def mouse(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "mouse"} ) @property def pen(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "pen"} ) @property def touch(self): return self.session.actions.sequence( "pointer", "pointer_id", {"pointerType": "touch"} ) @property def wheel(self): return self.session.actions.sequence("wheel", "wheel_id") @property def modifier_key(self): if self.session.capabilities["platformName"] == "mac": return "\ue03d" # meta (command) else: return "\ue009" # control def inline(self, doc): return "data:text/html;charset=utf-8,{}".format(quote(doc)) async def top_context(self): contexts = await self.session.bidi_session.browsing_context.get_tree() return contexts[0] async def navigate(self, url, timeout=None, **kwargs): return await asyncio.wait_for( asyncio.ensure_future(self._navigate(url, **kwargs)), timeout=timeout ) async def _navigate(self, url, wait="complete", await_console_message=None): if self.session.test_config.get("use_pbm") or self.session.test_config.get( "use_strict_etp" ): print("waiting for content blocker...") self.wait_for_content_blocker() if await_console_message is not None: console_message = await self.promise_console_message_listener( await_console_message ) if wait == "load": page_load = await self.promise_readystate_listener("load", url=url) try: await self.session.bidi_session.browsing_context.navigate( context=(await self.top_context())["context"], url=url, wait=wait if wait != "load" else None, ) except webdriver.bidi.error.UnknownErrorException as u: m = str(u) if ( "NS_BINDING_ABORTED" not in m and "NS_ERROR_ABORT" not in m and "NS_ERROR_WONT_HANDLE_CONTENT" not in m ): raise u if wait == "load": await page_load if await_console_message is not None: await console_message async def promise_event_listener(self, events, check_fn=None, timeout=20): if type(events) is not list: events = [events] await self.session.bidi_session.session.subscribe(events=events) future = self.event_loop.create_future() listener_removers = [] def remove_listeners(): for listener_remover in listener_removers: try: listener_remover() except Exception: pass async def on_event(method, data): print("on_event", method, data) val = None if check_fn is not None: val = check_fn(method, data) if val is None: return future.set_result(val) for event in events: r = self.session.bidi_session.add_event_listener(event, on_event) listener_removers.append(r) async def task(): try: return await asyncio.wait_for(future, timeout=timeout) finally: remove_listeners() try: await asyncio.wait_for( self.session.bidi_session.session.unsubscribe(events=events), timeout=4, ) except asyncio.exceptions.TimeoutError: print("Unexpectedly timed out unsubscribing", events) pass return asyncio.create_task(task()) async def promise_console_message_listener(self, msg, **kwargs): def check(method, data): if "text" in data: if msg in data["text"]: return data if "args" in data and len(data["args"]): for arg in data["args"]: if "value" in arg and msg in arg["value"]: return data return await self.promise_event_listener("log.entryAdded", check, **kwargs) async def is_console_message(self, message): try: await (await self.promise_console_message_listener(message, timeout=2)) return True except asyncio.exceptions.TimeoutError: return False async def promise_readystate_listener(self, state, url=None, **kwargs): event = f"browsingContext.{state}" def check(method, data): if url is None or url in data["url"]: return data return await self.promise_event_listener(event, check, **kwargs) async def promise_frame_listener(self, url, state="domContentLoaded", **kwargs): event = f"browsingContext.{state}" def check(method, data): if url is None or url in data["url"]: return Client.Context(self, data["context"]) return await self.promise_event_listener(event, check, **kwargs) async def find_frame_context_by_url(self, url): def find_in(arr, url): for context in arr: if url in context["url"]: return context for context in arr: found = find_in(context["children"], url) if found: return found return find_in([await self.top_context()], url) class Context: def __init__(self, client, id): self.client = client self.target = ContextTarget(id) async def find_css(self, selector, all=False): all = "All" if all else "" return await self.client.session.bidi_session.script.evaluate( expression=f"document.querySelector{all}('{selector}')", target=self.target, await_promise=False, ) def timed_js(self, timeout, poll, fn, is_displayed=False): return f"""() => new Promise((_good, _bad) => {{ {self.is_displayed_js()} var _poll = {poll} * 1000; var _time = {timeout} * 1000; var _done = false; var resolve = val => {{ if ({is_displayed}) {{ if (val.length) {{ val = val.filter(v = is_displayed(v)); }} else {{ val = is_displayed(val) && val; }} if (!val.length && !val.matches) {{ return; }} }} _done = true; clearInterval(_int); _good(val); }}; var reject = str => {{ _done = true; clearInterval(_int); _bad(val); }}; var _int = setInterval(() => {{ {fn}; if (!_done) {{ _time -= _poll; if (_time <= 0) {{ reject(); }} }} }}, poll); }})""" def is_displayed_js(self): return """ function is_displayed(e) { const s = window.getComputedStyle(e), v = s.visibility === "visible", o = Math.abs(parseFloat(s.opacity)); return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); } """ async def await_css( self, selector, all=False, timeout=10, poll=0.25, condition=False, is_displayed=False, ): all = "All" if all else "" condition = ( f"var elem=arguments[0]; if ({condition})" if condition else False ) return await self.client.session.bidi_session.script.evaluate( expression=self.timed_js( timeout, poll, f""" var ele = document.querySelector{all}('{selector}')"; if (ele && (!"length" in ele || ele.length > 0)) {{ '{condition}' resolve(ele); }} """, ), target=self.target, await_promise=True, ) async def await_text(self, text, **kwargs): xpath = f"//*[contains(text(),'{text}')]" return await self.await_xpath(self, xpath, **kwargs) async def await_xpath( self, xpath, all=False, timeout=10, poll=0.25, is_displayed=False ): all = "true" if all else "false" return await self.client.session.bidi_session.script.evaluate( expression=self.timed_js( timeout, poll, """ var ret = []; var r, res = document.evaluate(`{xpath}`, document, null, 4); while (r = res.iterateNext()) { ret.push(r); } resolve({all} ? ret : ret[0]); """, ), target=self.target, await_promise=True, ) def wrap_script_args(self, args): if args is None: return args out = [] for arg in args: if arg is None: out.append({"type": "undefined"}) continue t = type(arg) if t == int or t == float: out.append({"type": "number", "value": arg}) elif t == bool: out.append({"type": "boolean", "value": arg}) elif t == str: out.append({"type": "string", "value": arg}) else: if "type" in arg: out.push(arg) continue raise ValueError(f"Unhandled argument type: {t}") return out class PreloadScript: def __init__(self, client, script, target): self.client = client self.script = script if type(target) == list: self.target = target[0] else: self.target = target def stop(self): return self.client.session.bidi_session.script.remove_preload_script( script=self.script ) async def run(self, fn, *args, await_promise=False): val = await self.client.session.bidi_session.script.call_function( arguments=self.client.wrap_script_args(args), await_promise=await_promise, function_declaration=fn, target=self.target, ) if val and "value" in val: return val["value"] return val async def make_preload_script(self, text, sandbox, args=None, context=None): if not context: context = (await self.top_context())["context"] target = ContextTarget(context, sandbox) if args is None: text = f"() => {{ {text} }}" script = await self.session.bidi_session.script.add_preload_script( function_declaration=text, arguments=self.wrap_script_args(args), sandbox=sandbox, ) return Client.PreloadScript(self, script, target) async def await_alert(self, text): if not hasattr(self, "alert_preload_script"): self.alert_preload_script = await self.make_preload_script( """ window.__alerts = []; window.wrappedJSObject.alert = function(text) { window.__alerts.push(text); } """, "alert_detector", ) return self.alert_preload_script.run( """(msg) => new Promise(done => { const to = setInterval(() => { if (window.__alerts.includes(msg)) { clearInterval(to); done(); } }, 200); }) """, text, await_promise=True, ) async def await_popup(self, url=None): if not hasattr(self, "popup_preload_script"): self.popup_preload_script = await self.make_preload_script( """ window.__popups = []; window.wrappedJSObject.open = function(url) { window.__popups.push(url); } """, "popup_detector", ) return self.popup_preload_script.run( """(url) => new Promise(done => { const to = setInterval(() => { if (url === undefined && window.__popups.length) { clearInterval(to); return done(window.__popups[0]); } const found = window.__popups.find(u => u.includes(url)); if (found !== undefined) { clearInterval(to); done(found); } }, 1000); }) """, url, await_promise=True, ) async def track_listener(self, type, selector): if not hasattr(self, "listener_preload_script"): self.listener_preload_script = await self.make_preload_script( """ window.__listeners = {}; var proto = EventTarget.wrappedJSObject.prototype; var def = Object.getOwnPropertyDescriptor(proto, "addEventListener"); var old = def.value; def.value = function(type, fn, opts) { if ("matches" in this) { if (!window.__listeners[type]) { window.__listeners[type] = new Set(); } window.__listeners[type].add(this); } return old.call(this, type, fn, opts) }; Object.defineProperty(proto, "addEventListener", def); """, "listener_detector", ) return Client.ListenerTracker(self.listener_preload_script, type, selector) @contextlib.asynccontextmanager async def preload_script(self, text, *args): script = await self.make_preload_script(text, "preload", args=args) yield script await script.stop() def back(self): self.session.back() def switch_to_frame(self, frame): return self.session.transport.send( "POST", "session/{session_id}/frame".format(**vars(self.session)), {"id": frame}, encoder=webdriver.protocol.Encoder, decoder=webdriver.protocol.Decoder, session=self.session, ) def switch_frame(self, frame): self.session.switch_frame(frame) async def load_page_and_wait_for_iframe( self, url, finder, loads=1, timeout=None, **kwargs ): while loads > 0: await self.navigate(url, **kwargs) frame = self.await_element(finder, timeout=timeout) loads -= 1 self.switch_frame(frame) return frame def execute_script(self, script, *args): return self.session.execute_script(script, args=args) def execute_async_script(self, script, *args, **kwargs): return self.session.execute_async_script(script, args, **kwargs) def clear_all_cookies(self): self.session.transport.send( "DELETE", "session/%s/cookie" % self.session.session_id ) def send_element_command(self, element, method, uri, body=None): url = "element/%s/%s" % (element.id, uri) return self.session.send_session_command(method, url, body) def get_element_attribute(self, element, name): return self.send_element_command(element, "GET", "attribute/%s" % name) def _do_is_displayed_check(self, ele, is_displayed): if ele is None: return None if type(ele) in [list, tuple]: return [x for x in ele if self._do_is_displayed_check(x, is_displayed)] if is_displayed is False and ele and self.is_displayed(ele): return None if is_displayed is True and ele and not self.is_displayed(ele): return None return ele def find_css(self, *args, all=False, is_displayed=None, **kwargs): try: ele = self.session.find.css(*args, all=all, **kwargs) return self._do_is_displayed_check(ele, is_displayed) except webdriver.error.NoSuchElementException: return None def find_xpath(self, xpath, all=False, is_displayed=None): route = "elements" if all else "element" body = {"using": "xpath", "value": xpath} try: ele = self.session.send_session_command("POST", route, body) return self._do_is_displayed_check(ele, is_displayed) except webdriver.error.NoSuchElementException: return None def find_text(self, text, is_displayed=None, **kwargs): try: ele = self.find_xpath(f"//*[contains(text(),'{text}')]", **kwargs) return self._do_is_displayed_check(ele, is_displayed) except webdriver.error.NoSuchElementException: return None def find_element(self, finder, is_displayed=None, **kwargs): ele = finder.find(self, **kwargs) return self._do_is_displayed_check(ele, is_displayed) def await_css(self, selector, **kwargs): return self.await_element(self.css(selector), **kwargs) def await_xpath(self, selector, **kwargs): return self.await_element(self.xpath(selector), **kwargs) def await_text(self, selector, *args, **kwargs): return self.await_element(self.text(selector), **kwargs) def await_element(self, finder, **kwargs): return self.await_first_element_of([finder], **kwargs)[0] class css: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_css(self.selector, **kwargs) class xpath: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_xpath(self.selector, **kwargs) class text: def __init__(self, selector): self.selector = selector def find(self, client, **kwargs): return client.find_text(self.selector, **kwargs) def await_first_element_of( self, finders, timeout=None, delay=0.25, condition=False, **kwargs ): t0 = time.time() condition = f"var elem=arguments[0]; return {condition}" if condition else False if timeout is None: timeout = 10 found = [None for finder in finders] exc = None while time.time() < t0 + timeout: for i, finder in enumerate(finders): try: result = finder.find(self, **kwargs) if result and ( not condition or self.session.execute_script(condition, [result]) ): found[i] = result return found except webdriver.error.NoSuchElementException as e: exc = e time.sleep(delay) raise exc if exc is not None else webdriver.error.NoSuchElementException return found async def dom_ready(self, timeout=None): if timeout is None: timeout = 20 async def wait(): return self.session.execute_async_script( """ const cb = arguments[0]; setInterval(() => { if (document.readyState === "complete") { cb(); } }, 500); """ ) task = asyncio.create_task(wait()) return await asyncio.wait_for(task, timeout) def is_float_cleared(self, elem1, elem2): return self.session.execute_script( """return (function(a, b) { // Ensure that a is placed under b (and not to its right) return a?.offsetTop >= b?.offsetTop + b?.offsetHeight && a?.offsetLeft < b?.offsetLeft + b?.offsetWidth; }(arguments[0], arguments[1]));""", elem1, elem2, ) @contextlib.contextmanager def assert_getUserMedia_called(self): self.execute_script( """ navigator.mediaDevices.getUserMedia = navigator.mozGetUserMedia = navigator.getUserMedia = () => { window.__gumCalled = true; }; """ ) yield assert self.execute_script("return window.__gumCalled === true;") def await_element_hidden(self, finder, timeout=None, delay=0.25): t0 = time.time() if timeout is None: timeout = 20 elem = finder.find(self) while time.time() < t0 + timeout: try: if not self.is_displayed(elem): return time.sleep(delay) except webdriver.error.StaleElementReferenceException: return def soft_click(self, element): self.execute_script("arguments[0].click()", element) def remove_element(self, element): self.execute_script("arguments[0].remove()", element) def scroll_into_view(self, element): self.execute_script( "arguments[0].scrollIntoView({block:'center', inline:'center', behavior: 'instant'})", element, ) @contextlib.asynccontextmanager async def ensure_fastclick_activates(self): fastclick_preload_script = await self.make_preload_script( """ var _ = document.createElement("webcompat_test"); _.style = "position:absolute;right:-1px;width:1px;height:1px"; document.documentElement.appendChild(_); """, "fastclick_forcer", ) yield fastclick_preload_script.stop() def test_for_fastclick(self, element): # FastClick cancels touchend, breaking default actions on Fenix. # It instead fires a mousedown or click, which we can detect. self.execute_script( """ const sel = arguments[0]; window.fastclicked = false; const evt = sel.nodeName === "SELECT" ? "mousedown" : "click"; document.addEventListener(evt, e => { if (e.target === sel && !e.isTrusted) { window.fastclicked = true; } }, true); """, element, ) self.scroll_into_view(element) # tap a few times in case the site's other code interferes self.touch.click(element=element).perform() self.touch.click(element=element).perform() self.touch.click(element=element).perform() return self.execute_script("return window.fastclicked") def is_displayed(self, element): if element is None: return False return self.session.execute_script( """ const e = arguments[0], s = window.getComputedStyle(e), v = s.visibility === "visible", o = Math.abs(parseFloat(s.opacity)); return e.getClientRects().length > 0 && v && (isNaN(o) || o === 1.0); """, args=[element], )