summaryrefslogtreecommitdiffstats
path: root/tests/selenium/pbtest.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/selenium/pbtest.py498
1 files changed, 498 insertions, 0 deletions
diff --git a/tests/selenium/pbtest.py b/tests/selenium/pbtest.py
new file mode 100644
index 0000000..b502b4b
--- /dev/null
+++ b/tests/selenium/pbtest.py
@@ -0,0 +1,498 @@
+# -*- coding: UTF-8 -*-
+
+import json
+import os
+import subprocess
+import tempfile
+import time
+import unittest
+
+from contextlib import contextmanager
+from functools import wraps
+from shutil import copytree
+
+from selenium import webdriver
+from selenium.common.exceptions import (
+ NoSuchWindowException,
+ TimeoutException,
+ WebDriverException,
+)
+from selenium.webdriver.chrome.options import Options as ChromeOptions
+from selenium.webdriver.firefox.options import Options as FirefoxOptions
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.common.by import By
+
+try:
+ from xvfbwrapper import Xvfb
+except ImportError:
+ print("\n\nxvfbwrapper Python package import failed")
+ print("headless mode (ENABLE_XVFB=1) is not supported")
+
+
+SEL_DEFAULT_WAIT_TIMEOUT = 30
+
+BROWSER_TYPES = ['chrome', 'firefox']
+BROWSER_NAMES = ['google-chrome', 'google-chrome-stable', 'google-chrome-beta', 'firefox']
+
+parse_stdout = lambda res: res.strip().decode('utf-8')
+
+run_shell_command = lambda command: parse_stdout(subprocess.check_output(command))
+
+GIT_ROOT = run_shell_command(['git', 'rev-parse', '--show-toplevel'])
+
+
+class WindowNotFoundException(Exception):
+ pass
+
+
+def unix_which(command, silent=False):
+ try:
+ return run_shell_command(['which', command])
+ except subprocess.CalledProcessError as e:
+ if silent:
+ return None
+ raise e
+
+
+def get_browser_type(string):
+ for t in BROWSER_TYPES:
+ if t in string.lower():
+ return t
+ raise ValueError("couldn't get browser type from %s" % string)
+
+
+def get_browser_name(string):
+ if ('/' in string) or ('\\' in string): # it's a path
+ return os.path.basename(string)
+
+ # it's a browser type
+ for bn in BROWSER_NAMES:
+ if string in bn and unix_which(bn, silent=True):
+ return os.path.basename(unix_which(bn))
+ raise ValueError('Could not get browser name from %s' % string)
+
+
+class Shim:
+ _browser_msg = '''BROWSER should be one of:
+* /path/to/a/browser
+* a browser executable name so we can find the browser with "which $BROWSER"
+* something from BROWSER_TYPES
+'''
+ __doc__ = 'Chooses the correct driver and extension_url based on the BROWSER environment\nvariable. ' + _browser_msg
+
+ def __init__(self):
+ print("\n\nConfiguring the test run ...")
+
+ browser = os.environ.get('BROWSER')
+
+ # get browser_path and browser_type first
+ if browser is None:
+ raise ValueError("The BROWSER environment variable is not set. " + self._browser_msg)
+
+ if ("/" in browser) or ("\\" in browser): # path to a browser binary
+ self.browser_path = browser
+ self.browser_type = get_browser_type(self.browser_path)
+ elif unix_which(browser, silent=True): # executable browser name like 'google-chrome-stable'
+ self.browser_path = unix_which(browser)
+ self.browser_type = get_browser_type(browser)
+ elif get_browser_type(browser): # browser type like 'firefox' or 'chrome'
+ bname = get_browser_name(browser)
+ self.browser_path = unix_which(bname)
+ self.browser_type = browser
+ else:
+ raise ValueError("could not infer BROWSER from %s" % browser)
+
+ self.extension_path = os.path.join(GIT_ROOT, 'src')
+
+ if self.browser_type == 'chrome':
+ # this extension ID and the "key" property in manifest.json
+ # must both be derived from the same private key
+ self.info = {
+ 'extension_id': 'mcgekeccgjgcmhnhbabplanchdogjcnh'
+ }
+ self.manager = self.chrome_manager
+ self.base_url = 'chrome-extension://%s/' % self.info['extension_id']
+
+ # make extension ID constant across runs
+ self.fix_chrome_extension_id()
+
+ elif self.browser_type == 'firefox':
+ self.info = {
+ 'extension_id': 'jid1-MnnxcxisBPnSXQ@jetpack',
+ 'uuid': 'd56a5b99-51b6-4e83-ab23-796216679614'
+ }
+ self.manager = self.firefox_manager
+ self.base_url = 'moz-extension://%s/' % self.info['uuid']
+
+ print('\nUsing browser path: %s\nwith browser type: %s\nand extension path: %s\n' % (
+ self.browser_path, self.browser_type, self.extension_path))
+
+ def fix_chrome_extension_id(self):
+ # create temp directory
+ self.tmp_dir = tempfile.TemporaryDirectory()
+ new_extension_path = os.path.join(self.tmp_dir.name, "src")
+
+ # copy extension sources there
+ copytree(self.extension_path, new_extension_path)
+
+ # update manifest.json
+ manifest_path = os.path.join(new_extension_path, "manifest.json")
+ with open(manifest_path, "r") as f:
+ manifest = json.load(f)
+ # this key and the extension ID must both be derived from the same private key
+ manifest['key'] = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArMdgFkGsm7nOBr/9qkx8XEcmYSu1VkIXXK94oXLz1VKGB0o2MN+mXL/Dsllgkh61LZgK/gVuFFk89e/d6Vlsp9IpKLANuHgyS98FKx1+3sUoMujue+hyxulEGxXXJKXhk0kGxWdE0IDOamFYpF7Yk0K8Myd/JW1U2XOoOqJRZ7HR6is1W6iO/4IIL2/j3MUioVqu5ClT78+fE/Fn9b/DfzdX7RxMNza9UTiY+JCtkRTmm4ci4wtU1lxHuVmWiaS45xLbHphQr3fpemDlyTmaVoE59qG5SZZzvl6rwDah06dH01YGSzUF1ezM2IvY9ee1nMSHEadQRQ2sNduNZWC9gwIDAQAB" # noqa:E501 pylint:disable=line-too-long
+ with open(manifest_path, "w") as f:
+ json.dump(manifest, f)
+
+ # update self.extension_path
+ self.extension_path = new_extension_path
+
+ @property
+ def wants_xvfb(self):
+ if self.on_travis or bool(int(os.environ.get('ENABLE_XVFB', 0))):
+ try:
+ Xvfb
+ except NameError:
+ print("\nHeadless mode not supported: install xvfbwrapper first")
+ return False
+ return True
+ return False
+
+ @property
+ def on_travis(self):
+ if "TRAVIS" in os.environ:
+ return True
+ return False
+
+ @contextmanager
+ def chrome_manager(self):
+ opts = ChromeOptions()
+ if self.on_travis: # github.com/travis-ci/travis-ci/issues/938
+ opts.add_argument("--no-sandbox")
+ opts.add_argument("--load-extension=" + self.extension_path)
+ opts.binary_location = self.browser_path
+ opts.add_experimental_option("prefs", {"profile.block_third_party_cookies": False})
+
+ # TODO not yet in Firefox (w/o hacks anyway):
+ # https://github.com/mozilla/geckodriver/issues/284#issuecomment-456073771
+ opts.set_capability("loggingPrefs", {'browser': 'ALL'})
+
+ for i in range(5):
+ try:
+ driver = webdriver.Chrome(options=opts)
+ except WebDriverException as e:
+ if i == 0: print("")
+ print("Chrome WebDriver initialization failed:")
+ print(str(e) + "Retrying ...")
+ else:
+ break
+
+ try:
+ yield driver
+ finally:
+ driver.quit()
+
+ @contextmanager
+ def firefox_manager(self):
+ ffp = webdriver.FirefoxProfile()
+ # make extension ID constant across runs
+ ffp.set_preference('extensions.webextensions.uuids', '{"%s": "%s"}' %
+ (self.info['extension_id'], self.info['uuid']))
+
+ for i in range(5):
+ try:
+ opts = FirefoxOptions()
+ # to produce a trace-level geckodriver.log,
+ # remove the service_log_path argument to Firefox()
+ # and uncomment the line below
+ #opts.log.level = "trace"
+ driver = webdriver.Firefox(
+ firefox_profile=ffp,
+ firefox_binary=self.browser_path,
+ options=opts,
+ service_log_path=os.path.devnull)
+ except WebDriverException as e:
+ if i == 0: print("")
+ print("Firefox WebDriver initialization failed:")
+ print(str(e) + "Retrying ...")
+ else:
+ break
+
+ driver.install_addon(self.extension_path, temporary=True)
+
+ try:
+ yield driver
+ finally:
+ driver.quit()
+
+
+shim = Shim() # create the browser shim
+
+
+def if_firefox(wrapper):
+ '''
+ A test decorator that applies the function `wrapper` to the test if the
+ browser is firefox. Ex:
+
+ @if_firefox(unittest.skip("broken on ff"))
+ def test_stuff(self):
+ ...
+ '''
+ def test_catcher(test):
+ if shim.browser_type == 'firefox':
+ return wraps(test)(wrapper)(test)
+ return test
+
+ return test_catcher
+
+
+def retry_until(fun, tester=None, times=5, msg="Waiting a bit and retrying ..."):
+ """
+ Execute function `fun` until either its return is truthy
+ (or if `tester` is set, until the result of calling `tester` with `fun`'s return is truthy),
+ or it gets executed X times, where X = `times` + 1.
+ """
+ for i in range(times):
+ result = fun()
+
+ if tester is not None:
+ if tester(result):
+ break
+ elif result:
+ break
+
+ if i == 0:
+ print("")
+ print(msg)
+
+ time.sleep(2 ** i)
+
+ return result
+
+
+attempts = {} # used to count test retries
+def repeat_if_failed(ntimes): # noqa
+ '''
+ A decorator that retries the test if it fails `ntimes`. The TestCase must
+ be used on a subclass of unittest.TestCase. NB: this just registers function
+ to be retried. The try/except logic is in PBSeleniumTest.run.
+ '''
+ def test_catcher(test):
+ attempts[test.__name__] = ntimes
+
+ @wraps(test)
+ def caught(*args, **kwargs):
+ return test(*args, **kwargs)
+ return caught
+ return test_catcher
+
+
+class PBSeleniumTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.manager = shim.manager
+ cls.base_url = shim.base_url
+ cls.wants_xvfb = shim.wants_xvfb
+ if cls.wants_xvfb:
+ cls.vdisplay = Xvfb(width=1280, height=720)
+ cls.vdisplay.start()
+
+ # setting DBUS_SESSION_BUS_ADDRESS to nonsense prevents frequent
+ # hangs of chromedriver (possibly due to crbug.com/309093)
+ os.environ["DBUS_SESSION_BUS_ADDRESS"] = "/dev/null"
+ cls.proj_root = GIT_ROOT
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.wants_xvfb:
+ cls.vdisplay.stop()
+
+ def init(self, driver):
+ self.driver = driver
+ self.js = self.driver.execute_script
+ self.bg_url = self.base_url + "_generated_background_page.html"
+ self.options_url = self.base_url + "skin/options.html"
+ self.popup_url = self.base_url + "skin/popup.html"
+ self.first_run_url = self.base_url + "skin/firstRun.html"
+ self.test_url = self.base_url + "tests/index.html"
+
+ def run(self, result=None):
+ nretries = attempts.get(result.name, 1)
+ for i in range(nretries):
+ try:
+ with self.manager() as driver:
+ self.init(driver)
+
+ # wait for Badger's storage, listeners, ...
+ self.load_url(self.options_url)
+ self.wait_for_script(
+ "return chrome.extension.getBackgroundPage()."
+ "badger.INITIALIZED"
+ )
+
+ driver.close()
+ if driver.window_handles:
+ driver.switch_to.window(driver.window_handles[0])
+
+ super(PBSeleniumTest, self).run(result)
+
+ # retry test magic
+ if result.name in attempts and result._excinfo: # pylint:disable=protected-access
+ raise Exception(result._excinfo.pop()) # pylint:disable=protected-access
+
+ break
+
+ except Exception:
+ if i == nretries - 1:
+ raise
+
+ wait_secs = 2 ** i
+ print('\nRetrying {} after {} seconds ...'.format(
+ result, wait_secs))
+ time.sleep(wait_secs)
+ continue
+
+ def open_window(self):
+ if self.driver.current_url.startswith("moz-extension://"):
+ # work around https://bugzilla.mozilla.org/show_bug.cgi?id=1491443
+ self.wait_for_script("return typeof chrome != 'undefined' && chrome && chrome.extension")
+ self.js(
+ "delete window.__new_window_created;"
+ "chrome.windows.create({}, function () {"
+ "window.__new_window_created = true;"
+ "});"
+ )
+ self.wait_for_script("return window.__new_window_created")
+ else:
+ self.js('window.open()')
+
+ self.driver.switch_to.window(self.driver.window_handles[-1])
+
+ def load_url(self, url, wait_for_body_text=False, retries=5):
+ """Load a URL and wait before returning."""
+ for i in range(retries):
+ try:
+ self.driver.get(url)
+ break
+ except TimeoutException as e:
+ if i < retries - 1:
+ time.sleep(2 ** i)
+ continue
+ raise e
+ # work around geckodriver/marionette/Firefox timeout handling,
+ # for example: https://travis-ci.org/EFForg/privacybadger/jobs/389429089
+ except WebDriverException as e:
+ if str(e).startswith("Reached error page") and i < retries - 1:
+ time.sleep(2 ** i)
+ continue
+ raise e
+ self.driver.switch_to.window(self.driver.current_window_handle)
+
+ if wait_for_body_text:
+ retry_until(
+ lambda: self.driver.find_element_by_tag_name('body').text,
+ msg="Waiting for document.body.textContent to get populated ..."
+ )
+
+ def txt_by_css(self, css_selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
+ """Find an element by CSS selector and return its text."""
+ return self.find_el_by_css(
+ css_selector, visible_only=False, timeout=timeout).text
+
+ def find_el_by_css(self, css_selector, visible_only=True, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
+ condition = (
+ EC.visibility_of_element_located if visible_only
+ else EC.presence_of_element_located
+ )
+ return WebDriverWait(self.driver, timeout).until(
+ condition((By.CSS_SELECTOR, css_selector)))
+
+ def find_el_by_xpath(self, xpath, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
+ return WebDriverWait(self.driver, timeout).until(
+ EC.visibility_of_element_located((By.XPATH, xpath)))
+
+ def wait_for_script(
+ self,
+ script,
+ *script_args,
+ timeout=SEL_DEFAULT_WAIT_TIMEOUT,
+ message="Timed out waiting for execute_script to eval to True"
+ ):
+ """Variant of self.js that executes script continuously until it
+ returns True."""
+ return WebDriverWait(self.driver, timeout).until(
+ lambda driver: driver.execute_script(script, *script_args),
+ message
+ )
+
+ def wait_for_text(self, selector, text, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
+ return WebDriverWait(self.driver, timeout).until(
+ EC.text_to_be_present_in_element(
+ (By.CSS_SELECTOR, selector), text))
+
+ def wait_for_and_switch_to_frame(self, selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
+ return WebDriverWait(self.driver, timeout).until(
+ EC.frame_to_be_available_and_switch_to_it(
+ (By.CSS_SELECTOR, selector)))
+
+ def switch_to_window_with_url(self, url, max_tries=5):
+ """Point the driver to the first window that matches this url."""
+
+ for _ in range(max_tries):
+ for w in self.driver.window_handles:
+ try:
+ self.driver.switch_to.window(w)
+ if self.driver.current_url != url:
+ continue
+ except NoSuchWindowException:
+ pass
+ else:
+ return
+
+ time.sleep(1)
+
+ raise WindowNotFoundException("Failed to find window for " + url)
+
+
+ def close_window_with_url(self, url, max_tries=5):
+ self.switch_to_window_with_url(url, max_tries)
+
+ if len(self.driver.window_handles) == 1:
+ # open another window to avoid implicit session deletion
+ self.open_window()
+ self.switch_to_window_with_url(url, max_tries)
+
+ self.driver.close()
+ self.driver.switch_to.window(self.driver.window_handles[0])
+
+ def block_domain(self, domain):
+ self.load_url(self.options_url)
+ self.js((
+ "(function (domain) {"
+ " let bg = chrome.extension.getBackgroundPage();"
+ " let base_domain = window.getBaseDomain(domain);"
+ " bg.badger.heuristicBlocking.blocklistOrigin(domain, base_domain);"
+ "}(arguments[0]));"
+ ), domain)
+
+ def cookieblock_domain(self, domain):
+ self.load_url(self.options_url)
+ self.js((
+ "(function (domain) {"
+ " let bg = chrome.extension.getBackgroundPage();"
+ " bg.badger.storage.setupHeuristicAction(domain, bg.constants.COOKIEBLOCK);"
+ "}(arguments[0]));"
+ ), domain)
+
+ def disable_badger_on_site(self, url):
+ self.load_url(self.options_url)
+ self.wait_for_script("return window.OPTIONS_INITIALIZED")
+ self.find_el_by_css('a[href="#tab-allowlist"]').click()
+ self.driver.find_element_by_id('new-disabled-site-input').send_keys(url)
+ self.driver.find_element_by_css_selector('#add-disabled-site').click()
+
+ @property
+ def logs(self):
+ # TODO not yet in Firefox
+ return [log.get('message') for log in self.driver.get_log('browser')]