import os, json from urllib.parse import parse_qsl, SplitResult, urlencode, urlsplit, urlunsplit from wptserve.utils import isomorphic_decode, isomorphic_encode def get_template(template_basename): script_directory = os.path.dirname(os.path.abspath(isomorphic_decode(__file__))) template_directory = os.path.abspath(os.path.join(script_directory, u"template")) template_filename = os.path.join(template_directory, template_basename) with open(template_filename, "r") as f: return f.read() def redirect(url, response): response.add_required_headers = False response.writer.write_status(301) response.writer.write_header(b"access-control-allow-origin", b"*") response.writer.write_header(b"location", isomorphic_encode(url)) response.writer.end_headers() response.writer.write(u"") # TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with # referrer-policy-test-case.js. The prefix should be configured in one place. def __get_swapped_origin_netloc(netloc, subdomain_prefix = u"www1."): if netloc.startswith(subdomain_prefix): return netloc[len(subdomain_prefix):] else: return subdomain_prefix + netloc # Creates a URL (typically a redirect target URL) that is the same as the # current request URL `request.url`, except for: # - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed # to the other one. (http <-> https, ws <-> wss, etc.) # - For `downgrade`, we redirect to a URL that would be successfully loaded # if and only if upgrade-insecure-request is applied. # - `query_parameter_to_remove` parameter is removed from query part. # Its default is "redirection" to avoid redirect loops. def create_url(request, swap_scheme=False, swap_origin=False, downgrade=False, query_parameter_to_remove=u"redirection"): parsed = urlsplit(request.url) destination_netloc = parsed.netloc scheme = parsed.scheme if swap_scheme: scheme = u"http" if parsed.scheme == u"https" else u"https" hostname = parsed.netloc.split(u':')[0] port = request.server.config[u"ports"][scheme][0] destination_netloc = u":".join([hostname, str(port)]) if downgrade: # These rely on some unintuitive cleverness due to WPT's test setup: # 'Upgrade-Insecure-Requests' does not upgrade the port number, # so we use URLs in the form `http://[domain]:[https-port]`, # which will be upgraded to `https://[domain]:[https-port]`. # If the upgrade fails, the load will fail, as we don't serve HTTP over # the secure port. if parsed.scheme == u"https": scheme = u"http" elif parsed.scheme == u"wss": scheme = u"ws" else: raise ValueError(u"Downgrade redirection: Invalid scheme '%s'" % parsed.scheme) hostname = parsed.netloc.split(u':')[0] port = request.server.config[u"ports"][parsed.scheme][0] destination_netloc = u":".join([hostname, str(port)]) if swap_origin: destination_netloc = __get_swapped_origin_netloc(destination_netloc) parsed_query = parse_qsl(parsed.query, keep_blank_values=True) parsed_query = [x for x in parsed_query if x[0] != query_parameter_to_remove] destination_url = urlunsplit(SplitResult( scheme = scheme, netloc = destination_netloc, path = parsed.path, query = urlencode(parsed_query), fragment = None)) return destination_url def preprocess_redirection(request, response): if b"redirection" not in request.GET: return False redirection = request.GET[b"redirection"] if redirection == b"no-redirect": return False elif redirection == b"keep-scheme": redirect_url = create_url(request, swap_scheme=False) elif redirection == b"swap-scheme": redirect_url = create_url(request, swap_scheme=True) elif redirection == b"downgrade": redirect_url = create_url(request, downgrade=True) elif redirection == b"keep-origin": redirect_url = create_url(request, swap_origin=False) elif redirection == b"swap-origin": redirect_url = create_url(request, swap_origin=True) else: raise ValueError(u"Invalid redirection type '%s'" % isomorphic_decode(redirection)) redirect(redirect_url, response) return True def preprocess_stash_action(request, response): if b"action" not in request.GET: return False action = request.GET[b"action"] key = request.GET[b"key"] stash = request.server.stash path = request.GET[b"path"] if b"path" in request.GET \ else isomorphic_encode(request.url.split(u'?')[0]) if action == b"put": value = isomorphic_decode(request.GET[b"value"]) stash.take(key=key, path=path) stash.put(key=key, value=value, path=path) response_data = json.dumps({u"status": u"success", u"result": isomorphic_decode(key)}) elif action == b"purge": value = stash.take(key=key, path=path) return False elif action == b"take": value = stash.take(key=key, path=path) if value is None: status = u"allowed" else: status = u"blocked" response_data = json.dumps({u"status": status, u"result": value}) else: return False response.add_required_headers = False response.writer.write_status(200) response.writer.write_header(b"content-type", b"text/javascript") response.writer.write_header(b"cache-control", b"no-cache; must-revalidate") response.writer.end_headers() response.writer.write(response_data) return True def __noop(request, response): return u"" def respond(request, response, status_code = 200, content_type = b"text/html", payload_generator = __noop, cache_control = b"no-cache; must-revalidate", access_control_allow_origin = b"*", maybe_additional_headers = None): if preprocess_redirection(request, response): return if preprocess_stash_action(request, response): return response.add_required_headers = False response.writer.write_status(status_code) if access_control_allow_origin != None: response.writer.write_header(b"access-control-allow-origin", access_control_allow_origin) response.writer.write_header(b"content-type", content_type) response.writer.write_header(b"cache-control", cache_control) additional_headers = maybe_additional_headers or {} for header, value in additional_headers.items(): response.writer.write_header(header, value) response.writer.end_headers() new_headers = {} new_val = [] for key, val in request.headers.items(): if len(val) == 1: new_val = isomorphic_decode(val[0]) else: new_val = [isomorphic_decode(x) for x in val] new_headers[isomorphic_decode(key)] = new_val server_data = {u"headers": json.dumps(new_headers, indent = 4)} payload = payload_generator(server_data) response.writer.write(payload)