# -*- coding: UTF-8 -*- import json import os import subprocess import tempfile import time import unittest from contextlib import contextmanager from functools import wraps from shutil import copytree from selenium import webdriver from selenium.common.exceptions import ( NoSuchWindowException, TimeoutException, WebDriverException, ) from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.firefox.options import Options as FirefoxOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By try: from xvfbwrapper import Xvfb except ImportError: print("\n\nxvfbwrapper Python package import failed") print("headless mode (ENABLE_XVFB=1) is not supported") SEL_DEFAULT_WAIT_TIMEOUT = 30 BROWSER_TYPES = ['chrome', 'firefox'] BROWSER_NAMES = ['google-chrome', 'google-chrome-stable', 'google-chrome-beta', 'firefox'] parse_stdout = lambda res: res.strip().decode('utf-8') run_shell_command = lambda command: parse_stdout(subprocess.check_output(command)) GIT_ROOT = run_shell_command(['git', 'rev-parse', '--show-toplevel']) class WindowNotFoundException(Exception): pass def unix_which(command, silent=False): try: return run_shell_command(['which', command]) except subprocess.CalledProcessError as e: if silent: return None raise e def get_browser_type(string): for t in BROWSER_TYPES: if t in string.lower(): return t raise ValueError("couldn't get browser type from %s" % string) def get_browser_name(string): if ('/' in string) or ('\\' in string): # it's a path return os.path.basename(string) # it's a browser type for bn in BROWSER_NAMES: if string in bn and unix_which(bn, silent=True): return os.path.basename(unix_which(bn)) raise ValueError('Could not get browser name from %s' % string) class Shim: _browser_msg = '''BROWSER should be one of: * /path/to/a/browser * a browser executable name so we can find the browser with "which $BROWSER" * something from BROWSER_TYPES ''' __doc__ = 'Chooses the correct driver and extension_url based on the BROWSER environment\nvariable. ' + _browser_msg def __init__(self): print("\n\nConfiguring the test run ...") browser = os.environ.get('BROWSER') # get browser_path and browser_type first if browser is None: raise ValueError("The BROWSER environment variable is not set. " + self._browser_msg) if ("/" in browser) or ("\\" in browser): # path to a browser binary self.browser_path = browser self.browser_type = get_browser_type(self.browser_path) elif unix_which(browser, silent=True): # executable browser name like 'google-chrome-stable' self.browser_path = unix_which(browser) self.browser_type = get_browser_type(browser) elif get_browser_type(browser): # browser type like 'firefox' or 'chrome' bname = get_browser_name(browser) self.browser_path = unix_which(bname) self.browser_type = browser else: raise ValueError("could not infer BROWSER from %s" % browser) self.extension_path = os.path.join(GIT_ROOT, 'src') if self.browser_type == 'chrome': # this extension ID and the "key" property in manifest.json # must both be derived from the same private key self.info = { 'extension_id': 'mcgekeccgjgcmhnhbabplanchdogjcnh' } self.manager = self.chrome_manager self.base_url = 'chrome-extension://%s/' % self.info['extension_id'] # make extension ID constant across runs self.fix_chrome_extension_id() elif self.browser_type == 'firefox': self.info = { 'extension_id': 'jid1-MnnxcxisBPnSXQ@jetpack', 'uuid': 'd56a5b99-51b6-4e83-ab23-796216679614' } self.manager = self.firefox_manager self.base_url = 'moz-extension://%s/' % self.info['uuid'] print('\nUsing browser path: %s\nwith browser type: %s\nand extension path: %s\n' % ( self.browser_path, self.browser_type, self.extension_path)) def fix_chrome_extension_id(self): # create temp directory self.tmp_dir = tempfile.TemporaryDirectory() new_extension_path = os.path.join(self.tmp_dir.name, "src") # copy extension sources there copytree(self.extension_path, new_extension_path) # update manifest.json manifest_path = os.path.join(new_extension_path, "manifest.json") with open(manifest_path, "r") as f: manifest = json.load(f) # this key and the extension ID must both be derived from the same private key manifest['key'] = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArMdgFkGsm7nOBr/9qkx8XEcmYSu1VkIXXK94oXLz1VKGB0o2MN+mXL/Dsllgkh61LZgK/gVuFFk89e/d6Vlsp9IpKLANuHgyS98FKx1+3sUoMujue+hyxulEGxXXJKXhk0kGxWdE0IDOamFYpF7Yk0K8Myd/JW1U2XOoOqJRZ7HR6is1W6iO/4IIL2/j3MUioVqu5ClT78+fE/Fn9b/DfzdX7RxMNza9UTiY+JCtkRTmm4ci4wtU1lxHuVmWiaS45xLbHphQr3fpemDlyTmaVoE59qG5SZZzvl6rwDah06dH01YGSzUF1ezM2IvY9ee1nMSHEadQRQ2sNduNZWC9gwIDAQAB" # noqa:E501 pylint:disable=line-too-long with open(manifest_path, "w") as f: json.dump(manifest, f) # update self.extension_path self.extension_path = new_extension_path @property def wants_xvfb(self): if self.on_travis or bool(int(os.environ.get('ENABLE_XVFB', 0))): try: Xvfb except NameError: print("\nHeadless mode not supported: install xvfbwrapper first") return False return True return False @property def on_travis(self): if "TRAVIS" in os.environ: return True return False @contextmanager def chrome_manager(self): opts = ChromeOptions() if self.on_travis: # github.com/travis-ci/travis-ci/issues/938 opts.add_argument("--no-sandbox") opts.add_argument("--load-extension=" + self.extension_path) opts.binary_location = self.browser_path opts.add_experimental_option("prefs", {"profile.block_third_party_cookies": False}) # TODO not yet in Firefox (w/o hacks anyway): # https://github.com/mozilla/geckodriver/issues/284#issuecomment-456073771 opts.set_capability("loggingPrefs", {'browser': 'ALL'}) for i in range(5): try: driver = webdriver.Chrome(options=opts) except WebDriverException as e: if i == 0: print("") print("Chrome WebDriver initialization failed:") print(str(e) + "Retrying ...") else: break try: yield driver finally: driver.quit() @contextmanager def firefox_manager(self): ffp = webdriver.FirefoxProfile() # make extension ID constant across runs ffp.set_preference('extensions.webextensions.uuids', '{"%s": "%s"}' % (self.info['extension_id'], self.info['uuid'])) for i in range(5): try: opts = FirefoxOptions() # to produce a trace-level geckodriver.log, # remove the service_log_path argument to Firefox() # and uncomment the line below #opts.log.level = "trace" driver = webdriver.Firefox( firefox_profile=ffp, firefox_binary=self.browser_path, options=opts, service_log_path=os.path.devnull) except WebDriverException as e: if i == 0: print("") print("Firefox WebDriver initialization failed:") print(str(e) + "Retrying ...") else: break driver.install_addon(self.extension_path, temporary=True) try: yield driver finally: driver.quit() shim = Shim() # create the browser shim def if_firefox(wrapper): ''' A test decorator that applies the function `wrapper` to the test if the browser is firefox. Ex: @if_firefox(unittest.skip("broken on ff")) def test_stuff(self): ... ''' def test_catcher(test): if shim.browser_type == 'firefox': return wraps(test)(wrapper)(test) return test return test_catcher def retry_until(fun, tester=None, times=5, msg="Waiting a bit and retrying ..."): """ Execute function `fun` until either its return is truthy (or if `tester` is set, until the result of calling `tester` with `fun`'s return is truthy), or it gets executed X times, where X = `times` + 1. """ for i in range(times): result = fun() if tester is not None: if tester(result): break elif result: break if i == 0: print("") print(msg) time.sleep(2 ** i) return result attempts = {} # used to count test retries def repeat_if_failed(ntimes): # noqa ''' A decorator that retries the test if it fails `ntimes`. The TestCase must be used on a subclass of unittest.TestCase. NB: this just registers function to be retried. The try/except logic is in PBSeleniumTest.run. ''' def test_catcher(test): attempts[test.__name__] = ntimes @wraps(test) def caught(*args, **kwargs): return test(*args, **kwargs) return caught return test_catcher class PBSeleniumTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.manager = shim.manager cls.base_url = shim.base_url cls.wants_xvfb = shim.wants_xvfb if cls.wants_xvfb: cls.vdisplay = Xvfb(width=1280, height=720) cls.vdisplay.start() # setting DBUS_SESSION_BUS_ADDRESS to nonsense prevents frequent # hangs of chromedriver (possibly due to crbug.com/309093) os.environ["DBUS_SESSION_BUS_ADDRESS"] = "/dev/null" cls.proj_root = GIT_ROOT @classmethod def tearDownClass(cls): if cls.wants_xvfb: cls.vdisplay.stop() def init(self, driver): self.driver = driver self.js = self.driver.execute_script self.bg_url = self.base_url + "_generated_background_page.html" self.options_url = self.base_url + "skin/options.html" self.popup_url = self.base_url + "skin/popup.html" self.first_run_url = self.base_url + "skin/firstRun.html" self.test_url = self.base_url + "tests/index.html" def run(self, result=None): nretries = attempts.get(result.name, 1) for i in range(nretries): try: with self.manager() as driver: self.init(driver) # wait for Badger's storage, listeners, ... self.load_url(self.options_url) self.wait_for_script( "return chrome.extension.getBackgroundPage()." "badger.INITIALIZED" ) driver.close() if driver.window_handles: driver.switch_to.window(driver.window_handles[0]) super(PBSeleniumTest, self).run(result) # retry test magic if result.name in attempts and result._excinfo: # pylint:disable=protected-access raise Exception(result._excinfo.pop()) # pylint:disable=protected-access break except Exception: if i == nretries - 1: raise wait_secs = 2 ** i print('\nRetrying {} after {} seconds ...'.format( result, wait_secs)) time.sleep(wait_secs) continue def open_window(self): if self.driver.current_url.startswith("moz-extension://"): # work around https://bugzilla.mozilla.org/show_bug.cgi?id=1491443 self.wait_for_script("return typeof chrome != 'undefined' && chrome && chrome.extension") self.js( "delete window.__new_window_created;" "chrome.windows.create({}, function () {" "window.__new_window_created = true;" "});" ) self.wait_for_script("return window.__new_window_created") else: self.js('window.open()') self.driver.switch_to.window(self.driver.window_handles[-1]) def load_url(self, url, wait_for_body_text=False, retries=5): """Load a URL and wait before returning.""" for i in range(retries): try: self.driver.get(url) break except TimeoutException as e: if i < retries - 1: time.sleep(2 ** i) continue raise e # work around geckodriver/marionette/Firefox timeout handling, # for example: https://travis-ci.org/EFForg/privacybadger/jobs/389429089 except WebDriverException as e: if str(e).startswith("Reached error page") and i < retries - 1: time.sleep(2 ** i) continue raise e self.driver.switch_to.window(self.driver.current_window_handle) if wait_for_body_text: retry_until( lambda: self.driver.find_element_by_tag_name('body').text, msg="Waiting for document.body.textContent to get populated ..." ) def txt_by_css(self, css_selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT): """Find an element by CSS selector and return its text.""" return self.find_el_by_css( css_selector, visible_only=False, timeout=timeout).text def find_el_by_css(self, css_selector, visible_only=True, timeout=SEL_DEFAULT_WAIT_TIMEOUT): condition = ( EC.visibility_of_element_located if visible_only else EC.presence_of_element_located ) return WebDriverWait(self.driver, timeout).until( condition((By.CSS_SELECTOR, css_selector))) def find_el_by_xpath(self, xpath, timeout=SEL_DEFAULT_WAIT_TIMEOUT): return WebDriverWait(self.driver, timeout).until( EC.visibility_of_element_located((By.XPATH, xpath))) def wait_for_script( self, script, *script_args, timeout=SEL_DEFAULT_WAIT_TIMEOUT, message="Timed out waiting for execute_script to eval to True" ): """Variant of self.js that executes script continuously until it returns True.""" return WebDriverWait(self.driver, timeout).until( lambda driver: driver.execute_script(script, *script_args), message ) def wait_for_text(self, selector, text, timeout=SEL_DEFAULT_WAIT_TIMEOUT): return WebDriverWait(self.driver, timeout).until( EC.text_to_be_present_in_element( (By.CSS_SELECTOR, selector), text)) def wait_for_and_switch_to_frame(self, selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT): return WebDriverWait(self.driver, timeout).until( EC.frame_to_be_available_and_switch_to_it( (By.CSS_SELECTOR, selector))) def switch_to_window_with_url(self, url, max_tries=5): """Point the driver to the first window that matches this url.""" for _ in range(max_tries): for w in self.driver.window_handles: try: self.driver.switch_to.window(w) if self.driver.current_url != url: continue except NoSuchWindowException: pass else: return time.sleep(1) raise WindowNotFoundException("Failed to find window for " + url) def close_window_with_url(self, url, max_tries=5): self.switch_to_window_with_url(url, max_tries) if len(self.driver.window_handles) == 1: # open another window to avoid implicit session deletion self.open_window() self.switch_to_window_with_url(url, max_tries) self.driver.close() self.driver.switch_to.window(self.driver.window_handles[0]) def block_domain(self, domain): self.load_url(self.options_url) self.js(( "(function (domain) {" " let bg = chrome.extension.getBackgroundPage();" " let base_domain = window.getBaseDomain(domain);" " bg.badger.heuristicBlocking.blocklistOrigin(domain, base_domain);" "}(arguments[0]));" ), domain) def cookieblock_domain(self, domain): self.load_url(self.options_url) self.js(( "(function (domain) {" " let bg = chrome.extension.getBackgroundPage();" " bg.badger.storage.setupHeuristicAction(domain, bg.constants.COOKIEBLOCK);" "}(arguments[0]));" ), domain) def disable_badger_on_site(self, url): self.load_url(self.options_url) self.wait_for_script("return window.OPTIONS_INITIALIZED") self.find_el_by_css('a[href="#tab-allowlist"]').click() self.driver.find_element_by_id('new-disabled-site-input').send_keys(url) self.driver.find_element_by_css_selector('#add-disabled-site').click() @property def logs(self): # TODO not yet in Firefox return [log.get('message') for log in self.driver.get_log('browser')]