summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/fetch/http-cache/resources/http-cache.py
blob: 3ab610dd1421cea5289ad14a8ef472ade27f07f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import datetime
import json
import time
from base64 import b64decode

from wptserve.utils import isomorphic_decode, isomorphic_encode

NOTEHDRS = set([u'content-type', u'access-control-allow-origin', u'last-modified', u'etag'])
NOBODYSTATUS = set([204, 304])
LOCATIONHDRS = set([u'location', u'content-location'])
DATEHDRS = set([u'date', u'expires', u'last-modified'])

def main(request, response):
    dispatch = request.GET.first(b"dispatch", None)
    uuid = request.GET.first(b"uuid", None)
    response.headers.set(b"Access-Control-Allow-Credentials", b"true")

    if request.method == u"OPTIONS":
        return handle_preflight(uuid, request, response)
    if not uuid:
        response.status = (404, b"Not Found")
        response.headers.set(b"Content-Type", b"text/plain")
        return b"UUID not found"
    if dispatch == b'test':
        return handle_test(uuid, request, response)
    elif dispatch == b'state':
        return handle_state(uuid, request, response)
    response.status = (404, b"Not Found")
    response.headers.set(b"Content-Type", b"text/plain")
    return b"Fallthrough"

def handle_preflight(uuid, request, response):
    response.status = (200, b"OK")
    response.headers.set(b"Access-Control-Allow-Origin", request.headers.get(b"origin") or '*')
    response.headers.set(b"Access-Control-Allow-Methods", b"GET")
    response.headers.set(b"Access-Control-Allow-Headers", request.headers.get(b"Access-Control-Request-Headers") or "*")
    response.headers.set(b"Access-Control-Max-Age", b"86400")
    return b"Preflight request"

def handle_state(uuid, request, response):
    response.headers.set(b"Content-Type", b"text/plain")
    return json.dumps(request.server.stash.take(uuid))

def handle_test(uuid, request, response):
    server_state = request.server.stash.take(uuid) or []
    try:
        requests = json.loads(b64decode(request.headers.get(b'Test-Requests', b"")))
    except:
        response.status = (400, b"Bad Request")
        response.headers.set(b"Content-Type", b"text/plain")
        return b"No or bad Test-Requests request header"
    config = requests[len(server_state)]
    if not config:
        response.status = (404, b"Not Found")
        response.headers.set(b"Content-Type", b"text/plain")
        return b"Config not found"
    noted_headers = {}
    now = time.time()
    for header in config.get(u'response_headers', []):
        if header[0].lower() in LOCATIONHDRS: # magic locations
            if (len(header[1]) > 0):
                header[1] = u"%s&target=%s" % (request.url, header[1])
            else:
                header[1] = request.url
        if header[0].lower() in DATEHDRS and isinstance(header[1], int):  # magic dates
            header[1] = http_date(now, header[1])
        response.headers.set(isomorphic_encode(header[0]), isomorphic_encode(header[1]))
        if header[0].lower() in NOTEHDRS:
            noted_headers[header[0].lower()] = header[1]
    state = {
        u'now': now,
        u'request_method': request.method,
        u'request_headers': dict([[isomorphic_decode(h.lower()), isomorphic_decode(request.headers[h])] for h in request.headers]),
        u'response_headers': noted_headers
    }
    server_state.append(state)
    request.server.stash.put(uuid, server_state)

    if u"access-control-allow-origin" not in noted_headers:
        response.headers.set(b"Access-Control-Allow-Origin", b"*")
    if u"content-type" not in noted_headers:
        response.headers.set(b"Content-Type", b"text/plain")
    response.headers.set(b"Server-Request-Count", len(server_state))

    code, phrase = config.get(u"response_status", [200, b"OK"])
    if config.get(u"expected_type", u"").endswith(u'validated'):
        ref_hdrs = server_state[0][u'response_headers']
        previous_lm = ref_hdrs.get(u'last-modified', False)
        if previous_lm and request.headers.get(b"If-Modified-Since", False) == isomorphic_encode(previous_lm):
            code, phrase = [304, b"Not Modified"]
        previous_etag = ref_hdrs.get(u'etag', False)
        if previous_etag and request.headers.get(b"If-None-Match", False) == isomorphic_encode(previous_etag):
            code, phrase = [304, b"Not Modified"]
        if code != 304:
            code, phrase = [999, b'304 Not Generated']
    response.status = (code, phrase)

    content = config.get(u"response_body", uuid)
    if code in NOBODYSTATUS:
        return b""
    return content


def get_header(headers, header_name):
    result = None
    for header in headers:
        if header[0].lower() == header_name.lower():
            result = header[1]
    return result

WEEKDAYS = [u'Mon', u'Tue', u'Wed', u'Thu', u'Fri', u'Sat', u'Sun']
MONTHS = [None, u'Jan', u'Feb', u'Mar', u'Apr', u'May', u'Jun', u'Jul',
          u'Aug', u'Sep', u'Oct', u'Nov', u'Dec']

def http_date(now, delta_secs=0):
    date = datetime.datetime.utcfromtimestamp(now + delta_secs)
    return u"%s, %.2d %s %.4d %.2d:%.2d:%.2d GMT" % (
        WEEKDAYS[date.weekday()],
        date.day,
        MONTHS[date.month],
        date.year,
        date.hour,
        date.minute,
        date.second)