summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/fetch/range/resources/long-wav.py
blob: acfc81a71809dc3867a956872e6d65e23ad4d58d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
This generates a 30 minute silent wav, and is capable of
responding to Range requests.
"""
import time
import re
import struct

from wptserve.utils import isomorphic_decode

def create_wav_header(sample_rate, bit_depth, channels, duration):
    bytes_per_sample = int(bit_depth / 8)
    block_align = bytes_per_sample * channels
    byte_rate = sample_rate * block_align
    sub_chunk_2_size = duration * byte_rate

    data = b''
    # ChunkID
    data += b'RIFF'
    # ChunkSize
    data += struct.pack('<L', 36 + sub_chunk_2_size)
    # Format
    data += b'WAVE'
    # Subchunk1ID
    data += b'fmt '
    # Subchunk1Size
    data += struct.pack('<L', 16)
    # AudioFormat
    data += struct.pack('<H', 1)
    # NumChannels
    data += struct.pack('<H', channels)
    # SampleRate
    data += struct.pack('<L', sample_rate)
    # ByteRate
    data += struct.pack('<L', byte_rate)
    # BlockAlign
    data += struct.pack('<H', block_align)
    # BitsPerSample
    data += struct.pack('<H', bit_depth)
    # Subchunk2ID
    data += b'data'
    # Subchunk2Size
    data += struct.pack('<L', sub_chunk_2_size)

    return data


def main(request, response):
    if request.method == u"OPTIONS":
        response.status = (404, b"Not Found")
        response.headers.set(b"Content-Type", b"text/plain")
        return b"Preflight not accepted"

    response.headers.set(b"Content-Type", b"audio/wav")
    response.headers.set(b"Accept-Ranges", b"bytes")
    response.headers.set(b"Cache-Control", b"no-cache")
    response.headers.set(b"Access-Control-Allow-Origin", request.headers.get(b'Origin', b''))

    range_header = request.headers.get(b'Range', b'')
    range_header_match = range_header and re.search(r'^bytes=(\d*)-(\d*)$', isomorphic_decode(range_header))
    range_received_key = request.GET.first(b'range-received-key', b'')
    accept_encoding_key = request.GET.first(b'accept-encoding-key', b'')

    if range_received_key and range_header:
        # Remove any current value
        request.server.stash.take(range_received_key, b'/fetch/range/')
        # This is later collected using stash-take.py
        request.server.stash.put(range_received_key, u'range-header-received', b'/fetch/range/')

    if accept_encoding_key:
        # Remove any current value
        request.server.stash.take(
            accept_encoding_key,
            b'/fetch/range/'
        )
        # This is later collected using stash-take.py
        request.server.stash.put(
            accept_encoding_key,
            isomorphic_decode(request.headers.get(b'Accept-Encoding', b'')),
            b'/fetch/range/'
        )

    # Audio details
    sample_rate = 8000
    bit_depth = 8
    channels = 1
    duration = 60 * 5

    total_length = int((sample_rate * bit_depth * channels * duration) / 8)
    bytes_remaining_to_send = total_length
    initial_write = b''

    if range_header_match:
        response.status = 206
        start, end = range_header_match.groups()

        start = int(start)
        end = int(end) if end else 0

        if end:
            bytes_remaining_to_send = (end + 1) - start
        else:
            bytes_remaining_to_send = total_length - start

        wav_header = create_wav_header(sample_rate, bit_depth, channels, duration)

        if start < len(wav_header):
            initial_write = wav_header[start:]

            if bytes_remaining_to_send < len(initial_write):
                initial_write = initial_write[0:bytes_remaining_to_send]

        content_range = b"bytes %d-%d/%d" % (start, end or total_length - 1, total_length)

        response.headers.set(b"Content-Range", content_range)
    else:
        initial_write = create_wav_header(sample_rate, bit_depth, channels, duration)

    response.headers.set(b"Content-Length", bytes_remaining_to_send)

    response.write_status_headers()
    response.writer.write(initial_write)

    bytes_remaining_to_send -= len(initial_write)

    while bytes_remaining_to_send > 0:
        to_send = b'\x00' * min(bytes_remaining_to_send, sample_rate)
        bytes_remaining_to_send -= len(to_send)

        if not response.writer.write(to_send):
            break

        # Throttle the stream
        time.sleep(0.5)