diff options
Diffstat (limited to 'testing/web-platform/tests/common/security-features/subresource/subresource.py')
-rw-r--r-- | testing/web-platform/tests/common/security-features/subresource/subresource.py | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/testing/web-platform/tests/common/security-features/subresource/subresource.py b/testing/web-platform/tests/common/security-features/subresource/subresource.py new file mode 100644 index 0000000000..b3c055a93a --- /dev/null +++ b/testing/web-platform/tests/common/security-features/subresource/subresource.py @@ -0,0 +1,199 @@ +import os, json +from urllib.parse import parse_qsl, SplitResult, urlencode, urlsplit, urlunsplit + +from wptserve.utils import isomorphic_decode, isomorphic_encode + +def get_template(template_basename): + script_directory = os.path.dirname(os.path.abspath(isomorphic_decode(__file__))) + template_directory = os.path.abspath(os.path.join(script_directory, + u"template")) + template_filename = os.path.join(template_directory, template_basename) + + with open(template_filename, "r") as f: + return f.read() + + +def redirect(url, response): + response.add_required_headers = False + response.writer.write_status(301) + response.writer.write_header(b"access-control-allow-origin", b"*") + response.writer.write_header(b"location", isomorphic_encode(url)) + response.writer.end_headers() + response.writer.write(u"") + + +# TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with +# referrer-policy-test-case.js. The prefix should be configured in one place. +def __get_swapped_origin_netloc(netloc, subdomain_prefix = u"www1."): + if netloc.startswith(subdomain_prefix): + return netloc[len(subdomain_prefix):] + else: + return subdomain_prefix + netloc + + +# Creates a URL (typically a redirect target URL) that is the same as the +# current request URL `request.url`, except for: +# - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed +# to the other one. (http <-> https, ws <-> wss, etc.) +# - For `downgrade`, we redirect to a URL that would be successfully loaded +# if and only if upgrade-insecure-request is applied. +# - `query_parameter_to_remove` parameter is removed from query part. +# Its default is "redirection" to avoid redirect loops. +def create_url(request, + swap_scheme=False, + swap_origin=False, + downgrade=False, + query_parameter_to_remove=u"redirection"): + parsed = urlsplit(request.url) + destination_netloc = parsed.netloc + + scheme = parsed.scheme + if swap_scheme: + scheme = u"http" if parsed.scheme == u"https" else u"https" + hostname = parsed.netloc.split(u':')[0] + port = request.server.config[u"ports"][scheme][0] + destination_netloc = u":".join([hostname, str(port)]) + + if downgrade: + # These rely on some unintuitive cleverness due to WPT's test setup: + # 'Upgrade-Insecure-Requests' does not upgrade the port number, + # so we use URLs in the form `http://[domain]:[https-port]`, + # which will be upgraded to `https://[domain]:[https-port]`. + # If the upgrade fails, the load will fail, as we don't serve HTTP over + # the secure port. + if parsed.scheme == u"https": + scheme = u"http" + elif parsed.scheme == u"wss": + scheme = u"ws" + else: + raise ValueError(u"Downgrade redirection: Invalid scheme '%s'" % + parsed.scheme) + hostname = parsed.netloc.split(u':')[0] + port = request.server.config[u"ports"][parsed.scheme][0] + destination_netloc = u":".join([hostname, str(port)]) + + if swap_origin: + destination_netloc = __get_swapped_origin_netloc(destination_netloc) + + parsed_query = parse_qsl(parsed.query, keep_blank_values=True) + parsed_query = [x for x in parsed_query if x[0] != query_parameter_to_remove] + + destination_url = urlunsplit(SplitResult( + scheme = scheme, + netloc = destination_netloc, + path = parsed.path, + query = urlencode(parsed_query), + fragment = None)) + + return destination_url + + +def preprocess_redirection(request, response): + if b"redirection" not in request.GET: + return False + + redirection = request.GET[b"redirection"] + + if redirection == b"no-redirect": + return False + elif redirection == b"keep-scheme": + redirect_url = create_url(request, swap_scheme=False) + elif redirection == b"swap-scheme": + redirect_url = create_url(request, swap_scheme=True) + elif redirection == b"downgrade": + redirect_url = create_url(request, downgrade=True) + elif redirection == b"keep-origin": + redirect_url = create_url(request, swap_origin=False) + elif redirection == b"swap-origin": + redirect_url = create_url(request, swap_origin=True) + else: + raise ValueError(u"Invalid redirection type '%s'" % isomorphic_decode(redirection)) + + redirect(redirect_url, response) + return True + + +def preprocess_stash_action(request, response): + if b"action" not in request.GET: + return False + + action = request.GET[b"action"] + + key = request.GET[b"key"] + stash = request.server.stash + path = request.GET[b"path"] if b"path" in request.GET \ + else isomorphic_encode(request.url.split(u'?')[0]) + + if action == b"put": + value = isomorphic_decode(request.GET[b"value"]) + stash.take(key=key, path=path) + stash.put(key=key, value=value, path=path) + response_data = json.dumps({u"status": u"success", u"result": isomorphic_decode(key)}) + elif action == b"purge": + value = stash.take(key=key, path=path) + return False + elif action == b"take": + value = stash.take(key=key, path=path) + if value is None: + status = u"allowed" + else: + status = u"blocked" + response_data = json.dumps({u"status": status, u"result": value}) + else: + return False + + response.add_required_headers = False + response.writer.write_status(200) + response.writer.write_header(b"content-type", b"text/javascript") + response.writer.write_header(b"cache-control", b"no-cache; must-revalidate") + response.writer.end_headers() + response.writer.write(response_data) + return True + + +def __noop(request, response): + return u"" + + +def respond(request, + response, + status_code = 200, + content_type = b"text/html", + payload_generator = __noop, + cache_control = b"no-cache; must-revalidate", + access_control_allow_origin = b"*", + maybe_additional_headers = None): + if preprocess_redirection(request, response): + return + + if preprocess_stash_action(request, response): + return + + response.add_required_headers = False + response.writer.write_status(status_code) + + if access_control_allow_origin != None: + response.writer.write_header(b"access-control-allow-origin", + access_control_allow_origin) + response.writer.write_header(b"content-type", content_type) + response.writer.write_header(b"cache-control", cache_control) + + additional_headers = maybe_additional_headers or {} + for header, value in additional_headers.items(): + response.writer.write_header(header, value) + + response.writer.end_headers() + + new_headers = {} + new_val = [] + for key, val in request.headers.items(): + if len(val) == 1: + new_val = isomorphic_decode(val[0]) + else: + new_val = [isomorphic_decode(x) for x in val] + new_headers[isomorphic_decode(key)] = new_val + + server_data = {u"headers": json.dumps(new_headers, indent = 4)} + + payload = payload_generator(server_data) + response.writer.write(payload) |