From 75417f5e3d32645859d94cec82255dc130ec4a2e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 18:55:34 +0200 Subject: Adding upstream version 2020.10.7. Signed-off-by: Daniel Baumann --- tests/selenium/pbtest.py | 498 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 498 insertions(+) create mode 100644 tests/selenium/pbtest.py (limited to 'tests/selenium/pbtest.py') diff --git a/tests/selenium/pbtest.py b/tests/selenium/pbtest.py new file mode 100644 index 0000000..b502b4b --- /dev/null +++ b/tests/selenium/pbtest.py @@ -0,0 +1,498 @@ +# -*- coding: UTF-8 -*- + +import json +import os +import subprocess +import tempfile +import time +import unittest + +from contextlib import contextmanager +from functools import wraps +from shutil import copytree + +from selenium import webdriver +from selenium.common.exceptions import ( + NoSuchWindowException, + TimeoutException, + WebDriverException, +) +from selenium.webdriver.chrome.options import Options as ChromeOptions +from selenium.webdriver.firefox.options import Options as FirefoxOptions +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By + +try: + from xvfbwrapper import Xvfb +except ImportError: + print("\n\nxvfbwrapper Python package import failed") + print("headless mode (ENABLE_XVFB=1) is not supported") + + +SEL_DEFAULT_WAIT_TIMEOUT = 30 + +BROWSER_TYPES = ['chrome', 'firefox'] +BROWSER_NAMES = ['google-chrome', 'google-chrome-stable', 'google-chrome-beta', 'firefox'] + +parse_stdout = lambda res: res.strip().decode('utf-8') + +run_shell_command = lambda command: parse_stdout(subprocess.check_output(command)) + +GIT_ROOT = run_shell_command(['git', 'rev-parse', '--show-toplevel']) + + +class WindowNotFoundException(Exception): + pass + + +def unix_which(command, silent=False): + try: + return run_shell_command(['which', command]) + except subprocess.CalledProcessError as e: + if silent: + return None + raise e + + +def get_browser_type(string): + for t in BROWSER_TYPES: + if t in string.lower(): + return t + raise ValueError("couldn't get browser type from %s" % string) + + +def get_browser_name(string): + if ('/' in string) or ('\\' in string): # it's a path + return os.path.basename(string) + + # it's a browser type + for bn in BROWSER_NAMES: + if string in bn and unix_which(bn, silent=True): + return os.path.basename(unix_which(bn)) + raise ValueError('Could not get browser name from %s' % string) + + +class Shim: + _browser_msg = '''BROWSER should be one of: +* /path/to/a/browser +* a browser executable name so we can find the browser with "which $BROWSER" +* something from BROWSER_TYPES +''' + __doc__ = 'Chooses the correct driver and extension_url based on the BROWSER environment\nvariable. ' + _browser_msg + + def __init__(self): + print("\n\nConfiguring the test run ...") + + browser = os.environ.get('BROWSER') + + # get browser_path and browser_type first + if browser is None: + raise ValueError("The BROWSER environment variable is not set. " + self._browser_msg) + + if ("/" in browser) or ("\\" in browser): # path to a browser binary + self.browser_path = browser + self.browser_type = get_browser_type(self.browser_path) + elif unix_which(browser, silent=True): # executable browser name like 'google-chrome-stable' + self.browser_path = unix_which(browser) + self.browser_type = get_browser_type(browser) + elif get_browser_type(browser): # browser type like 'firefox' or 'chrome' + bname = get_browser_name(browser) + self.browser_path = unix_which(bname) + self.browser_type = browser + else: + raise ValueError("could not infer BROWSER from %s" % browser) + + self.extension_path = os.path.join(GIT_ROOT, 'src') + + if self.browser_type == 'chrome': + # this extension ID and the "key" property in manifest.json + # must both be derived from the same private key + self.info = { + 'extension_id': 'mcgekeccgjgcmhnhbabplanchdogjcnh' + } + self.manager = self.chrome_manager + self.base_url = 'chrome-extension://%s/' % self.info['extension_id'] + + # make extension ID constant across runs + self.fix_chrome_extension_id() + + elif self.browser_type == 'firefox': + self.info = { + 'extension_id': 'jid1-MnnxcxisBPnSXQ@jetpack', + 'uuid': 'd56a5b99-51b6-4e83-ab23-796216679614' + } + self.manager = self.firefox_manager + self.base_url = 'moz-extension://%s/' % self.info['uuid'] + + print('\nUsing browser path: %s\nwith browser type: %s\nand extension path: %s\n' % ( + self.browser_path, self.browser_type, self.extension_path)) + + def fix_chrome_extension_id(self): + # create temp directory + self.tmp_dir = tempfile.TemporaryDirectory() + new_extension_path = os.path.join(self.tmp_dir.name, "src") + + # copy extension sources there + copytree(self.extension_path, new_extension_path) + + # update manifest.json + manifest_path = os.path.join(new_extension_path, "manifest.json") + with open(manifest_path, "r") as f: + manifest = json.load(f) + # this key and the extension ID must both be derived from the same private key + manifest['key'] = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArMdgFkGsm7nOBr/9qkx8XEcmYSu1VkIXXK94oXLz1VKGB0o2MN+mXL/Dsllgkh61LZgK/gVuFFk89e/d6Vlsp9IpKLANuHgyS98FKx1+3sUoMujue+hyxulEGxXXJKXhk0kGxWdE0IDOamFYpF7Yk0K8Myd/JW1U2XOoOqJRZ7HR6is1W6iO/4IIL2/j3MUioVqu5ClT78+fE/Fn9b/DfzdX7RxMNza9UTiY+JCtkRTmm4ci4wtU1lxHuVmWiaS45xLbHphQr3fpemDlyTmaVoE59qG5SZZzvl6rwDah06dH01YGSzUF1ezM2IvY9ee1nMSHEadQRQ2sNduNZWC9gwIDAQAB" # noqa:E501 pylint:disable=line-too-long + with open(manifest_path, "w") as f: + json.dump(manifest, f) + + # update self.extension_path + self.extension_path = new_extension_path + + @property + def wants_xvfb(self): + if self.on_travis or bool(int(os.environ.get('ENABLE_XVFB', 0))): + try: + Xvfb + except NameError: + print("\nHeadless mode not supported: install xvfbwrapper first") + return False + return True + return False + + @property + def on_travis(self): + if "TRAVIS" in os.environ: + return True + return False + + @contextmanager + def chrome_manager(self): + opts = ChromeOptions() + if self.on_travis: # github.com/travis-ci/travis-ci/issues/938 + opts.add_argument("--no-sandbox") + opts.add_argument("--load-extension=" + self.extension_path) + opts.binary_location = self.browser_path + opts.add_experimental_option("prefs", {"profile.block_third_party_cookies": False}) + + # TODO not yet in Firefox (w/o hacks anyway): + # https://github.com/mozilla/geckodriver/issues/284#issuecomment-456073771 + opts.set_capability("loggingPrefs", {'browser': 'ALL'}) + + for i in range(5): + try: + driver = webdriver.Chrome(options=opts) + except WebDriverException as e: + if i == 0: print("") + print("Chrome WebDriver initialization failed:") + print(str(e) + "Retrying ...") + else: + break + + try: + yield driver + finally: + driver.quit() + + @contextmanager + def firefox_manager(self): + ffp = webdriver.FirefoxProfile() + # make extension ID constant across runs + ffp.set_preference('extensions.webextensions.uuids', '{"%s": "%s"}' % + (self.info['extension_id'], self.info['uuid'])) + + for i in range(5): + try: + opts = FirefoxOptions() + # to produce a trace-level geckodriver.log, + # remove the service_log_path argument to Firefox() + # and uncomment the line below + #opts.log.level = "trace" + driver = webdriver.Firefox( + firefox_profile=ffp, + firefox_binary=self.browser_path, + options=opts, + service_log_path=os.path.devnull) + except WebDriverException as e: + if i == 0: print("") + print("Firefox WebDriver initialization failed:") + print(str(e) + "Retrying ...") + else: + break + + driver.install_addon(self.extension_path, temporary=True) + + try: + yield driver + finally: + driver.quit() + + +shim = Shim() # create the browser shim + + +def if_firefox(wrapper): + ''' + A test decorator that applies the function `wrapper` to the test if the + browser is firefox. Ex: + + @if_firefox(unittest.skip("broken on ff")) + def test_stuff(self): + ... + ''' + def test_catcher(test): + if shim.browser_type == 'firefox': + return wraps(test)(wrapper)(test) + return test + + return test_catcher + + +def retry_until(fun, tester=None, times=5, msg="Waiting a bit and retrying ..."): + """ + Execute function `fun` until either its return is truthy + (or if `tester` is set, until the result of calling `tester` with `fun`'s return is truthy), + or it gets executed X times, where X = `times` + 1. + """ + for i in range(times): + result = fun() + + if tester is not None: + if tester(result): + break + elif result: + break + + if i == 0: + print("") + print(msg) + + time.sleep(2 ** i) + + return result + + +attempts = {} # used to count test retries +def repeat_if_failed(ntimes): # noqa + ''' + A decorator that retries the test if it fails `ntimes`. The TestCase must + be used on a subclass of unittest.TestCase. NB: this just registers function + to be retried. The try/except logic is in PBSeleniumTest.run. + ''' + def test_catcher(test): + attempts[test.__name__] = ntimes + + @wraps(test) + def caught(*args, **kwargs): + return test(*args, **kwargs) + return caught + return test_catcher + + +class PBSeleniumTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.manager = shim.manager + cls.base_url = shim.base_url + cls.wants_xvfb = shim.wants_xvfb + if cls.wants_xvfb: + cls.vdisplay = Xvfb(width=1280, height=720) + cls.vdisplay.start() + + # setting DBUS_SESSION_BUS_ADDRESS to nonsense prevents frequent + # hangs of chromedriver (possibly due to crbug.com/309093) + os.environ["DBUS_SESSION_BUS_ADDRESS"] = "/dev/null" + cls.proj_root = GIT_ROOT + + @classmethod + def tearDownClass(cls): + if cls.wants_xvfb: + cls.vdisplay.stop() + + def init(self, driver): + self.driver = driver + self.js = self.driver.execute_script + self.bg_url = self.base_url + "_generated_background_page.html" + self.options_url = self.base_url + "skin/options.html" + self.popup_url = self.base_url + "skin/popup.html" + self.first_run_url = self.base_url + "skin/firstRun.html" + self.test_url = self.base_url + "tests/index.html" + + def run(self, result=None): + nretries = attempts.get(result.name, 1) + for i in range(nretries): + try: + with self.manager() as driver: + self.init(driver) + + # wait for Badger's storage, listeners, ... + self.load_url(self.options_url) + self.wait_for_script( + "return chrome.extension.getBackgroundPage()." + "badger.INITIALIZED" + ) + + driver.close() + if driver.window_handles: + driver.switch_to.window(driver.window_handles[0]) + + super(PBSeleniumTest, self).run(result) + + # retry test magic + if result.name in attempts and result._excinfo: # pylint:disable=protected-access + raise Exception(result._excinfo.pop()) # pylint:disable=protected-access + + break + + except Exception: + if i == nretries - 1: + raise + + wait_secs = 2 ** i + print('\nRetrying {} after {} seconds ...'.format( + result, wait_secs)) + time.sleep(wait_secs) + continue + + def open_window(self): + if self.driver.current_url.startswith("moz-extension://"): + # work around https://bugzilla.mozilla.org/show_bug.cgi?id=1491443 + self.wait_for_script("return typeof chrome != 'undefined' && chrome && chrome.extension") + self.js( + "delete window.__new_window_created;" + "chrome.windows.create({}, function () {" + "window.__new_window_created = true;" + "});" + ) + self.wait_for_script("return window.__new_window_created") + else: + self.js('window.open()') + + self.driver.switch_to.window(self.driver.window_handles[-1]) + + def load_url(self, url, wait_for_body_text=False, retries=5): + """Load a URL and wait before returning.""" + for i in range(retries): + try: + self.driver.get(url) + break + except TimeoutException as e: + if i < retries - 1: + time.sleep(2 ** i) + continue + raise e + # work around geckodriver/marionette/Firefox timeout handling, + # for example: https://travis-ci.org/EFForg/privacybadger/jobs/389429089 + except WebDriverException as e: + if str(e).startswith("Reached error page") and i < retries - 1: + time.sleep(2 ** i) + continue + raise e + self.driver.switch_to.window(self.driver.current_window_handle) + + if wait_for_body_text: + retry_until( + lambda: self.driver.find_element_by_tag_name('body').text, + msg="Waiting for document.body.textContent to get populated ..." + ) + + def txt_by_css(self, css_selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT): + """Find an element by CSS selector and return its text.""" + return self.find_el_by_css( + css_selector, visible_only=False, timeout=timeout).text + + def find_el_by_css(self, css_selector, visible_only=True, timeout=SEL_DEFAULT_WAIT_TIMEOUT): + condition = ( + EC.visibility_of_element_located if visible_only + else EC.presence_of_element_located + ) + return WebDriverWait(self.driver, timeout).until( + condition((By.CSS_SELECTOR, css_selector))) + + def find_el_by_xpath(self, xpath, timeout=SEL_DEFAULT_WAIT_TIMEOUT): + return WebDriverWait(self.driver, timeout).until( + EC.visibility_of_element_located((By.XPATH, xpath))) + + def wait_for_script( + self, + script, + *script_args, + timeout=SEL_DEFAULT_WAIT_TIMEOUT, + message="Timed out waiting for execute_script to eval to True" + ): + """Variant of self.js that executes script continuously until it + returns True.""" + return WebDriverWait(self.driver, timeout).until( + lambda driver: driver.execute_script(script, *script_args), + message + ) + + def wait_for_text(self, selector, text, timeout=SEL_DEFAULT_WAIT_TIMEOUT): + return WebDriverWait(self.driver, timeout).until( + EC.text_to_be_present_in_element( + (By.CSS_SELECTOR, selector), text)) + + def wait_for_and_switch_to_frame(self, selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT): + return WebDriverWait(self.driver, timeout).until( + EC.frame_to_be_available_and_switch_to_it( + (By.CSS_SELECTOR, selector))) + + def switch_to_window_with_url(self, url, max_tries=5): + """Point the driver to the first window that matches this url.""" + + for _ in range(max_tries): + for w in self.driver.window_handles: + try: + self.driver.switch_to.window(w) + if self.driver.current_url != url: + continue + except NoSuchWindowException: + pass + else: + return + + time.sleep(1) + + raise WindowNotFoundException("Failed to find window for " + url) + + + def close_window_with_url(self, url, max_tries=5): + self.switch_to_window_with_url(url, max_tries) + + if len(self.driver.window_handles) == 1: + # open another window to avoid implicit session deletion + self.open_window() + self.switch_to_window_with_url(url, max_tries) + + self.driver.close() + self.driver.switch_to.window(self.driver.window_handles[0]) + + def block_domain(self, domain): + self.load_url(self.options_url) + self.js(( + "(function (domain) {" + " let bg = chrome.extension.getBackgroundPage();" + " let base_domain = window.getBaseDomain(domain);" + " bg.badger.heuristicBlocking.blocklistOrigin(domain, base_domain);" + "}(arguments[0]));" + ), domain) + + def cookieblock_domain(self, domain): + self.load_url(self.options_url) + self.js(( + "(function (domain) {" + " let bg = chrome.extension.getBackgroundPage();" + " bg.badger.storage.setupHeuristicAction(domain, bg.constants.COOKIEBLOCK);" + "}(arguments[0]));" + ), domain) + + def disable_badger_on_site(self, url): + self.load_url(self.options_url) + self.wait_for_script("return window.OPTIONS_INITIALIZED") + self.find_el_by_css('a[href="#tab-allowlist"]').click() + self.driver.find_element_by_id('new-disabled-site-input').send_keys(url) + self.driver.find_element_by_css_selector('#add-disabled-site').click() + + @property + def logs(self): + # TODO not yet in Firefox + return [log.get('message') for log in self.driver.get_log('browser')] -- cgit v1.2.3