diff options
Diffstat (limited to 'testing/mozbase/mozproxy/mozproxy/backends/mitm/scripts/inject-deterministic.py')
-rw-r--r-- | testing/mozbase/mozproxy/mozproxy/backends/mitm/scripts/inject-deterministic.py | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/testing/mozbase/mozproxy/mozproxy/backends/mitm/scripts/inject-deterministic.py b/testing/mozbase/mozproxy/mozproxy/backends/mitm/scripts/inject-deterministic.py new file mode 100644 index 0000000000..70536d76e6 --- /dev/null +++ b/testing/mozbase/mozproxy/mozproxy/backends/mitm/scripts/inject-deterministic.py @@ -0,0 +1,207 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +import base64 +import hashlib +import re +import time +from os import path + +from mitmproxy import ctx + + +class AddDeterministic: + def __init__(self): + self.millis = int(round(time.time() * 1000)) + ctx.log.info("Init Deterministic JS") + + def load(self, loader): + ctx.log.info("Load Deterministic JS") + + def get_csp_directives(self, test_header, headers): + csp = headers.get(test_header, "") + return [d.strip() for d in csp.split(";")] + + def get_csp_script_sources(self, test_header, headers): + sources = [] + for directive in self.get_csp_directives(test_header, headers): + if directive.startswith("script-src "): + sources = directive.split()[1:] + return sources + + def get_nonce_from_headers(self, test_header, headers): + """ + get_nonce_from_headers returns the nonce token from a + Content-Security-Policy (CSP) header's script source directive. + + Note: + For more background information on CSP and nonce, please refer to + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ + Content-Security-Policy/script-src + https://developers.google.com/web/fundamentals/security/csp/ + """ + + for source in self.get_csp_script_sources(test_header, headers) or []: + if source.startswith("'nonce-"): + return source.partition("'nonce-")[-1][:-1] + + def get_script_with_nonce(self, script, nonce=None): + """ + Given a nonce, get_script_with_nonce returns the injected script text with the nonce. + + If nonce None, get_script_with_nonce returns the script block + without attaching a nonce attribute. + + Note: + Some responses may specify a nonce inside their Content-Security-Policy, + script-src directive. + The script injector needs to set the injected script's nonce attribute to + open execute permission for the injected script. + """ + + if nonce: + return '<script nonce="{}">{}</script>'.format(nonce, script) + return "<script>{}</script>".format(script) + + def update_csp_script_src(self, test_header, headers, sha256): + """ + Update the CSP script directives with appropriate information + + Without this permissions a page with a + restrictive CSP will not execute injected scripts. + + Note: + For more background information on CSP, please refer to + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ + Content-Security-Policy/script-src + https://developers.google.com/web/fundamentals/security/csp/ + """ + + sources = self.get_csp_script_sources(test_header, headers) + add_unsafe = True + + for token in sources: + if token == "'unsafe-inline'": + add_unsafe = False + ctx.log.info("Contains unsafe-inline") + elif token.startswith("'sha"): + sources.append("'sha256-{}'".format(sha256)) + add_unsafe = False + ctx.log.info("Add sha hash directive") + break + + if add_unsafe: + ctx.log.info("Add unsafe") + sources.append("'unsafe-inline'") + + return "script-src {}".format(" ".join(sources)) + + def get_new_csp_header(self, test_header, headers, updated_csp_script): + """ + get_new_csp_header generates a new header object containing + the updated elements from new_csp_script_directives + """ + + if updated_csp_script: + directives = self.get_csp_directives(test_header, headers) + for index, directive in enumerate(directives): + if directive.startswith("script-src "): + directives[index] = updated_csp_script + + ctx.log.info("Original Header %s \n" % headers["Content-Security-Policy"]) + headers["Content-Security-Policy"] = "; ".join(directives) + ctx.log.info("Updated Header %s \n" % headers["Content-Security-Policy"]) + + return headers + + def response(self, flow): + # pylint: disable=W1633 + if "content-type" in flow.response.headers: + if "text/html" in flow.response.headers["content-type"]: + ctx.log.info( + "Working on {}".format(flow.response.headers["content-type"]) + ) + + flow.response.decode() + html = flow.response.text + + with open( + path.join(path.dirname(__file__), "catapult/deterministic.js"), "r" + ) as jsfile: + + js = jsfile.read().replace( + "REPLACE_LOAD_TIMESTAMP", str(self.millis) + ) + + if js not in html: + script_index = re.search("(?i).*?<head.*?>", html) + if script_index is None: + script_index = re.search("(?i).*?<html.*?>", html) + if script_index is None: + script_index = re.search("(?i).*?<!doctype html>", html) + if script_index is None: + ctx.log.info( + "No start tags found in request {}. Skip injecting".format( + flow.request.url + ) + ) + return + script_index = script_index.end() + + nonce = None + for test_header in [ + "Content-Security-Policy", + "Content-Security-Policy-Report-Only", + ]: + if flow.response.headers.get(test_header, False): + nonce = self.get_nonce_from_headers( + test_header, flow.response.headers + ) + ctx.log.info("nonce : %s" % nonce) + + if ( + self.get_csp_script_sources( + test_header, flow.response.headers + ) + and not nonce + ): + # generate sha256 for the script + hash_object = hashlib.sha256(js.encode("utf-8")) + script_sha256 = base64.b64encode( + hash_object.digest() + ).decode("utf-8") + + # generate the new response headers + updated_script_sources = self.update_csp_script_src( + test_header, + flow.response.headers, + script_sha256, + ) + flow.response.headers = self.get_new_csp_header( + test_header, + flow.response.headers, + updated_script_sources, + ) + + # generate new html file + new_html = ( + html[:script_index] + + self.get_script_with_nonce(js, nonce) + + html[script_index:] + ) + flow.response.text = new_html + + ctx.log.info( + "In request {} injected deterministic JS".format( + flow.request.url + ) + ) + else: + ctx.log.info( + "Script already injected in request {}".format( + flow.request.url + ) + ) + + +addons = [AddDeterministic()] |