summaryrefslogtreecommitdiffstats
path: root/test/wpt/tests/common/security-features/subresource/subresource.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/wpt/tests/common/security-features/subresource/subresource.py')
-rw-r--r--test/wpt/tests/common/security-features/subresource/subresource.py199
1 files changed, 199 insertions, 0 deletions
diff --git a/test/wpt/tests/common/security-features/subresource/subresource.py b/test/wpt/tests/common/security-features/subresource/subresource.py
new file mode 100644
index 0000000..b3c055a
--- /dev/null
+++ b/test/wpt/tests/common/security-features/subresource/subresource.py
@@ -0,0 +1,199 @@
+import os, json
+from urllib.parse import parse_qsl, SplitResult, urlencode, urlsplit, urlunsplit
+
+from wptserve.utils import isomorphic_decode, isomorphic_encode
+
+def get_template(template_basename):
+ script_directory = os.path.dirname(os.path.abspath(isomorphic_decode(__file__)))
+ template_directory = os.path.abspath(os.path.join(script_directory,
+ u"template"))
+ template_filename = os.path.join(template_directory, template_basename)
+
+ with open(template_filename, "r") as f:
+ return f.read()
+
+
+def redirect(url, response):
+ response.add_required_headers = False
+ response.writer.write_status(301)
+ response.writer.write_header(b"access-control-allow-origin", b"*")
+ response.writer.write_header(b"location", isomorphic_encode(url))
+ response.writer.end_headers()
+ response.writer.write(u"")
+
+
+# TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with
+# referrer-policy-test-case.js. The prefix should be configured in one place.
+def __get_swapped_origin_netloc(netloc, subdomain_prefix = u"www1."):
+ if netloc.startswith(subdomain_prefix):
+ return netloc[len(subdomain_prefix):]
+ else:
+ return subdomain_prefix + netloc
+
+
+# Creates a URL (typically a redirect target URL) that is the same as the
+# current request URL `request.url`, except for:
+# - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed
+# to the other one. (http <-> https, ws <-> wss, etc.)
+# - For `downgrade`, we redirect to a URL that would be successfully loaded
+# if and only if upgrade-insecure-request is applied.
+# - `query_parameter_to_remove` parameter is removed from query part.
+# Its default is "redirection" to avoid redirect loops.
+def create_url(request,
+ swap_scheme=False,
+ swap_origin=False,
+ downgrade=False,
+ query_parameter_to_remove=u"redirection"):
+ parsed = urlsplit(request.url)
+ destination_netloc = parsed.netloc
+
+ scheme = parsed.scheme
+ if swap_scheme:
+ scheme = u"http" if parsed.scheme == u"https" else u"https"
+ hostname = parsed.netloc.split(u':')[0]
+ port = request.server.config[u"ports"][scheme][0]
+ destination_netloc = u":".join([hostname, str(port)])
+
+ if downgrade:
+ # These rely on some unintuitive cleverness due to WPT's test setup:
+ # 'Upgrade-Insecure-Requests' does not upgrade the port number,
+ # so we use URLs in the form `http://[domain]:[https-port]`,
+ # which will be upgraded to `https://[domain]:[https-port]`.
+ # If the upgrade fails, the load will fail, as we don't serve HTTP over
+ # the secure port.
+ if parsed.scheme == u"https":
+ scheme = u"http"
+ elif parsed.scheme == u"wss":
+ scheme = u"ws"
+ else:
+ raise ValueError(u"Downgrade redirection: Invalid scheme '%s'" %
+ parsed.scheme)
+ hostname = parsed.netloc.split(u':')[0]
+ port = request.server.config[u"ports"][parsed.scheme][0]
+ destination_netloc = u":".join([hostname, str(port)])
+
+ if swap_origin:
+ destination_netloc = __get_swapped_origin_netloc(destination_netloc)
+
+ parsed_query = parse_qsl(parsed.query, keep_blank_values=True)
+ parsed_query = [x for x in parsed_query if x[0] != query_parameter_to_remove]
+
+ destination_url = urlunsplit(SplitResult(
+ scheme = scheme,
+ netloc = destination_netloc,
+ path = parsed.path,
+ query = urlencode(parsed_query),
+ fragment = None))
+
+ return destination_url
+
+
+def preprocess_redirection(request, response):
+ if b"redirection" not in request.GET:
+ return False
+
+ redirection = request.GET[b"redirection"]
+
+ if redirection == b"no-redirect":
+ return False
+ elif redirection == b"keep-scheme":
+ redirect_url = create_url(request, swap_scheme=False)
+ elif redirection == b"swap-scheme":
+ redirect_url = create_url(request, swap_scheme=True)
+ elif redirection == b"downgrade":
+ redirect_url = create_url(request, downgrade=True)
+ elif redirection == b"keep-origin":
+ redirect_url = create_url(request, swap_origin=False)
+ elif redirection == b"swap-origin":
+ redirect_url = create_url(request, swap_origin=True)
+ else:
+ raise ValueError(u"Invalid redirection type '%s'" % isomorphic_decode(redirection))
+
+ redirect(redirect_url, response)
+ return True
+
+
+def preprocess_stash_action(request, response):
+ if b"action" not in request.GET:
+ return False
+
+ action = request.GET[b"action"]
+
+ key = request.GET[b"key"]
+ stash = request.server.stash
+ path = request.GET[b"path"] if b"path" in request.GET \
+ else isomorphic_encode(request.url.split(u'?')[0])
+
+ if action == b"put":
+ value = isomorphic_decode(request.GET[b"value"])
+ stash.take(key=key, path=path)
+ stash.put(key=key, value=value, path=path)
+ response_data = json.dumps({u"status": u"success", u"result": isomorphic_decode(key)})
+ elif action == b"purge":
+ value = stash.take(key=key, path=path)
+ return False
+ elif action == b"take":
+ value = stash.take(key=key, path=path)
+ if value is None:
+ status = u"allowed"
+ else:
+ status = u"blocked"
+ response_data = json.dumps({u"status": status, u"result": value})
+ else:
+ return False
+
+ response.add_required_headers = False
+ response.writer.write_status(200)
+ response.writer.write_header(b"content-type", b"text/javascript")
+ response.writer.write_header(b"cache-control", b"no-cache; must-revalidate")
+ response.writer.end_headers()
+ response.writer.write(response_data)
+ return True
+
+
+def __noop(request, response):
+ return u""
+
+
+def respond(request,
+ response,
+ status_code = 200,
+ content_type = b"text/html",
+ payload_generator = __noop,
+ cache_control = b"no-cache; must-revalidate",
+ access_control_allow_origin = b"*",
+ maybe_additional_headers = None):
+ if preprocess_redirection(request, response):
+ return
+
+ if preprocess_stash_action(request, response):
+ return
+
+ response.add_required_headers = False
+ response.writer.write_status(status_code)
+
+ if access_control_allow_origin != None:
+ response.writer.write_header(b"access-control-allow-origin",
+ access_control_allow_origin)
+ response.writer.write_header(b"content-type", content_type)
+ response.writer.write_header(b"cache-control", cache_control)
+
+ additional_headers = maybe_additional_headers or {}
+ for header, value in additional_headers.items():
+ response.writer.write_header(header, value)
+
+ response.writer.end_headers()
+
+ new_headers = {}
+ new_val = []
+ for key, val in request.headers.items():
+ if len(val) == 1:
+ new_val = isomorphic_decode(val[0])
+ else:
+ new_val = [isomorphic_decode(x) for x in val]
+ new_headers[isomorphic_decode(key)] = new_val
+
+ server_data = {u"headers": json.dumps(new_headers, indent = 4)}
+
+ payload = payload_generator(server_data)
+ response.writer.write(payload)