1
0
Fork 0
privacybadger/tests/selenium/pbtest.py
Daniel Baumann 51333c7ef4
Adding upstream version 2020.10.7.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-22 23:00:13 +02:00

498 lines
18 KiB
Python

# -*- coding: UTF-8 -*-
import json
import os
import subprocess
import tempfile
import time
import unittest
from contextlib import contextmanager
from functools import wraps
from shutil import copytree
from selenium import webdriver
from selenium.common.exceptions import (
NoSuchWindowException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
try:
from xvfbwrapper import Xvfb
except ImportError:
print("\n\nxvfbwrapper Python package import failed")
print("headless mode (ENABLE_XVFB=1) is not supported")
SEL_DEFAULT_WAIT_TIMEOUT = 30
BROWSER_TYPES = ['chrome', 'firefox']
BROWSER_NAMES = ['google-chrome', 'google-chrome-stable', 'google-chrome-beta', 'firefox']
parse_stdout = lambda res: res.strip().decode('utf-8')
run_shell_command = lambda command: parse_stdout(subprocess.check_output(command))
GIT_ROOT = run_shell_command(['git', 'rev-parse', '--show-toplevel'])
class WindowNotFoundException(Exception):
pass
def unix_which(command, silent=False):
try:
return run_shell_command(['which', command])
except subprocess.CalledProcessError as e:
if silent:
return None
raise e
def get_browser_type(string):
for t in BROWSER_TYPES:
if t in string.lower():
return t
raise ValueError("couldn't get browser type from %s" % string)
def get_browser_name(string):
if ('/' in string) or ('\\' in string): # it's a path
return os.path.basename(string)
# it's a browser type
for bn in BROWSER_NAMES:
if string in bn and unix_which(bn, silent=True):
return os.path.basename(unix_which(bn))
raise ValueError('Could not get browser name from %s' % string)
class Shim:
_browser_msg = '''BROWSER should be one of:
* /path/to/a/browser
* a browser executable name so we can find the browser with "which $BROWSER"
* something from BROWSER_TYPES
'''
__doc__ = 'Chooses the correct driver and extension_url based on the BROWSER environment\nvariable. ' + _browser_msg
def __init__(self):
print("\n\nConfiguring the test run ...")
browser = os.environ.get('BROWSER')
# get browser_path and browser_type first
if browser is None:
raise ValueError("The BROWSER environment variable is not set. " + self._browser_msg)
if ("/" in browser) or ("\\" in browser): # path to a browser binary
self.browser_path = browser
self.browser_type = get_browser_type(self.browser_path)
elif unix_which(browser, silent=True): # executable browser name like 'google-chrome-stable'
self.browser_path = unix_which(browser)
self.browser_type = get_browser_type(browser)
elif get_browser_type(browser): # browser type like 'firefox' or 'chrome'
bname = get_browser_name(browser)
self.browser_path = unix_which(bname)
self.browser_type = browser
else:
raise ValueError("could not infer BROWSER from %s" % browser)
self.extension_path = os.path.join(GIT_ROOT, 'src')
if self.browser_type == 'chrome':
# this extension ID and the "key" property in manifest.json
# must both be derived from the same private key
self.info = {
'extension_id': 'mcgekeccgjgcmhnhbabplanchdogjcnh'
}
self.manager = self.chrome_manager
self.base_url = 'chrome-extension://%s/' % self.info['extension_id']
# make extension ID constant across runs
self.fix_chrome_extension_id()
elif self.browser_type == 'firefox':
self.info = {
'extension_id': 'jid1-MnnxcxisBPnSXQ@jetpack',
'uuid': 'd56a5b99-51b6-4e83-ab23-796216679614'
}
self.manager = self.firefox_manager
self.base_url = 'moz-extension://%s/' % self.info['uuid']
print('\nUsing browser path: %s\nwith browser type: %s\nand extension path: %s\n' % (
self.browser_path, self.browser_type, self.extension_path))
def fix_chrome_extension_id(self):
# create temp directory
self.tmp_dir = tempfile.TemporaryDirectory()
new_extension_path = os.path.join(self.tmp_dir.name, "src")
# copy extension sources there
copytree(self.extension_path, new_extension_path)
# update manifest.json
manifest_path = os.path.join(new_extension_path, "manifest.json")
with open(manifest_path, "r") as f:
manifest = json.load(f)
# this key and the extension ID must both be derived from the same private key
manifest['key'] = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArMdgFkGsm7nOBr/9qkx8XEcmYSu1VkIXXK94oXLz1VKGB0o2MN+mXL/Dsllgkh61LZgK/gVuFFk89e/d6Vlsp9IpKLANuHgyS98FKx1+3sUoMujue+hyxulEGxXXJKXhk0kGxWdE0IDOamFYpF7Yk0K8Myd/JW1U2XOoOqJRZ7HR6is1W6iO/4IIL2/j3MUioVqu5ClT78+fE/Fn9b/DfzdX7RxMNza9UTiY+JCtkRTmm4ci4wtU1lxHuVmWiaS45xLbHphQr3fpemDlyTmaVoE59qG5SZZzvl6rwDah06dH01YGSzUF1ezM2IvY9ee1nMSHEadQRQ2sNduNZWC9gwIDAQAB" # noqa:E501 pylint:disable=line-too-long
with open(manifest_path, "w") as f:
json.dump(manifest, f)
# update self.extension_path
self.extension_path = new_extension_path
@property
def wants_xvfb(self):
if self.on_travis or bool(int(os.environ.get('ENABLE_XVFB', 0))):
try:
Xvfb
except NameError:
print("\nHeadless mode not supported: install xvfbwrapper first")
return False
return True
return False
@property
def on_travis(self):
if "TRAVIS" in os.environ:
return True
return False
@contextmanager
def chrome_manager(self):
opts = ChromeOptions()
if self.on_travis: # github.com/travis-ci/travis-ci/issues/938
opts.add_argument("--no-sandbox")
opts.add_argument("--load-extension=" + self.extension_path)
opts.binary_location = self.browser_path
opts.add_experimental_option("prefs", {"profile.block_third_party_cookies": False})
# TODO not yet in Firefox (w/o hacks anyway):
# https://github.com/mozilla/geckodriver/issues/284#issuecomment-456073771
opts.set_capability("loggingPrefs", {'browser': 'ALL'})
for i in range(5):
try:
driver = webdriver.Chrome(options=opts)
except WebDriverException as e:
if i == 0: print("")
print("Chrome WebDriver initialization failed:")
print(str(e) + "Retrying ...")
else:
break
try:
yield driver
finally:
driver.quit()
@contextmanager
def firefox_manager(self):
ffp = webdriver.FirefoxProfile()
# make extension ID constant across runs
ffp.set_preference('extensions.webextensions.uuids', '{"%s": "%s"}' %
(self.info['extension_id'], self.info['uuid']))
for i in range(5):
try:
opts = FirefoxOptions()
# to produce a trace-level geckodriver.log,
# remove the service_log_path argument to Firefox()
# and uncomment the line below
#opts.log.level = "trace"
driver = webdriver.Firefox(
firefox_profile=ffp,
firefox_binary=self.browser_path,
options=opts,
service_log_path=os.path.devnull)
except WebDriverException as e:
if i == 0: print("")
print("Firefox WebDriver initialization failed:")
print(str(e) + "Retrying ...")
else:
break
driver.install_addon(self.extension_path, temporary=True)
try:
yield driver
finally:
driver.quit()
shim = Shim() # create the browser shim
def if_firefox(wrapper):
'''
A test decorator that applies the function `wrapper` to the test if the
browser is firefox. Ex:
@if_firefox(unittest.skip("broken on ff"))
def test_stuff(self):
...
'''
def test_catcher(test):
if shim.browser_type == 'firefox':
return wraps(test)(wrapper)(test)
return test
return test_catcher
def retry_until(fun, tester=None, times=5, msg="Waiting a bit and retrying ..."):
"""
Execute function `fun` until either its return is truthy
(or if `tester` is set, until the result of calling `tester` with `fun`'s return is truthy),
or it gets executed X times, where X = `times` + 1.
"""
for i in range(times):
result = fun()
if tester is not None:
if tester(result):
break
elif result:
break
if i == 0:
print("")
print(msg)
time.sleep(2 ** i)
return result
attempts = {} # used to count test retries
def repeat_if_failed(ntimes): # noqa
'''
A decorator that retries the test if it fails `ntimes`. The TestCase must
be used on a subclass of unittest.TestCase. NB: this just registers function
to be retried. The try/except logic is in PBSeleniumTest.run.
'''
def test_catcher(test):
attempts[test.__name__] = ntimes
@wraps(test)
def caught(*args, **kwargs):
return test(*args, **kwargs)
return caught
return test_catcher
class PBSeleniumTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.manager = shim.manager
cls.base_url = shim.base_url
cls.wants_xvfb = shim.wants_xvfb
if cls.wants_xvfb:
cls.vdisplay = Xvfb(width=1280, height=720)
cls.vdisplay.start()
# setting DBUS_SESSION_BUS_ADDRESS to nonsense prevents frequent
# hangs of chromedriver (possibly due to crbug.com/309093)
os.environ["DBUS_SESSION_BUS_ADDRESS"] = "/dev/null"
cls.proj_root = GIT_ROOT
@classmethod
def tearDownClass(cls):
if cls.wants_xvfb:
cls.vdisplay.stop()
def init(self, driver):
self.driver = driver
self.js = self.driver.execute_script
self.bg_url = self.base_url + "_generated_background_page.html"
self.options_url = self.base_url + "skin/options.html"
self.popup_url = self.base_url + "skin/popup.html"
self.first_run_url = self.base_url + "skin/firstRun.html"
self.test_url = self.base_url + "tests/index.html"
def run(self, result=None):
nretries = attempts.get(result.name, 1)
for i in range(nretries):
try:
with self.manager() as driver:
self.init(driver)
# wait for Badger's storage, listeners, ...
self.load_url(self.options_url)
self.wait_for_script(
"return chrome.extension.getBackgroundPage()."
"badger.INITIALIZED"
)
driver.close()
if driver.window_handles:
driver.switch_to.window(driver.window_handles[0])
super(PBSeleniumTest, self).run(result)
# retry test magic
if result.name in attempts and result._excinfo: # pylint:disable=protected-access
raise Exception(result._excinfo.pop()) # pylint:disable=protected-access
break
except Exception:
if i == nretries - 1:
raise
wait_secs = 2 ** i
print('\nRetrying {} after {} seconds ...'.format(
result, wait_secs))
time.sleep(wait_secs)
continue
def open_window(self):
if self.driver.current_url.startswith("moz-extension://"):
# work around https://bugzilla.mozilla.org/show_bug.cgi?id=1491443
self.wait_for_script("return typeof chrome != 'undefined' && chrome && chrome.extension")
self.js(
"delete window.__new_window_created;"
"chrome.windows.create({}, function () {"
"window.__new_window_created = true;"
"});"
)
self.wait_for_script("return window.__new_window_created")
else:
self.js('window.open()')
self.driver.switch_to.window(self.driver.window_handles[-1])
def load_url(self, url, wait_for_body_text=False, retries=5):
"""Load a URL and wait before returning."""
for i in range(retries):
try:
self.driver.get(url)
break
except TimeoutException as e:
if i < retries - 1:
time.sleep(2 ** i)
continue
raise e
# work around geckodriver/marionette/Firefox timeout handling,
# for example: https://travis-ci.org/EFForg/privacybadger/jobs/389429089
except WebDriverException as e:
if str(e).startswith("Reached error page") and i < retries - 1:
time.sleep(2 ** i)
continue
raise e
self.driver.switch_to.window(self.driver.current_window_handle)
if wait_for_body_text:
retry_until(
lambda: self.driver.find_element_by_tag_name('body').text,
msg="Waiting for document.body.textContent to get populated ..."
)
def txt_by_css(self, css_selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
"""Find an element by CSS selector and return its text."""
return self.find_el_by_css(
css_selector, visible_only=False, timeout=timeout).text
def find_el_by_css(self, css_selector, visible_only=True, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
condition = (
EC.visibility_of_element_located if visible_only
else EC.presence_of_element_located
)
return WebDriverWait(self.driver, timeout).until(
condition((By.CSS_SELECTOR, css_selector)))
def find_el_by_xpath(self, xpath, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
return WebDriverWait(self.driver, timeout).until(
EC.visibility_of_element_located((By.XPATH, xpath)))
def wait_for_script(
self,
script,
*script_args,
timeout=SEL_DEFAULT_WAIT_TIMEOUT,
message="Timed out waiting for execute_script to eval to True"
):
"""Variant of self.js that executes script continuously until it
returns True."""
return WebDriverWait(self.driver, timeout).until(
lambda driver: driver.execute_script(script, *script_args),
message
)
def wait_for_text(self, selector, text, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
return WebDriverWait(self.driver, timeout).until(
EC.text_to_be_present_in_element(
(By.CSS_SELECTOR, selector), text))
def wait_for_and_switch_to_frame(self, selector, timeout=SEL_DEFAULT_WAIT_TIMEOUT):
return WebDriverWait(self.driver, timeout).until(
EC.frame_to_be_available_and_switch_to_it(
(By.CSS_SELECTOR, selector)))
def switch_to_window_with_url(self, url, max_tries=5):
"""Point the driver to the first window that matches this url."""
for _ in range(max_tries):
for w in self.driver.window_handles:
try:
self.driver.switch_to.window(w)
if self.driver.current_url != url:
continue
except NoSuchWindowException:
pass
else:
return
time.sleep(1)
raise WindowNotFoundException("Failed to find window for " + url)
def close_window_with_url(self, url, max_tries=5):
self.switch_to_window_with_url(url, max_tries)
if len(self.driver.window_handles) == 1:
# open another window to avoid implicit session deletion
self.open_window()
self.switch_to_window_with_url(url, max_tries)
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
def block_domain(self, domain):
self.load_url(self.options_url)
self.js((
"(function (domain) {"
" let bg = chrome.extension.getBackgroundPage();"
" let base_domain = window.getBaseDomain(domain);"
" bg.badger.heuristicBlocking.blocklistOrigin(domain, base_domain);"
"}(arguments[0]));"
), domain)
def cookieblock_domain(self, domain):
self.load_url(self.options_url)
self.js((
"(function (domain) {"
" let bg = chrome.extension.getBackgroundPage();"
" bg.badger.storage.setupHeuristicAction(domain, bg.constants.COOKIEBLOCK);"
"}(arguments[0]));"
), domain)
def disable_badger_on_site(self, url):
self.load_url(self.options_url)
self.wait_for_script("return window.OPTIONS_INITIALIZED")
self.find_el_by_css('a[href="#tab-allowlist"]').click()
self.driver.find_element_by_id('new-disabled-site-input').send_keys(url)
self.driver.find_element_by_css_selector('#add-disabled-site').click()
@property
def logs(self):
# TODO not yet in Firefox
return [log.get('message') for log in self.driver.get_log('browser')]