summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
blob: ca24a598345e68a8d8e8c87d068d936fdf48dcbb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""
Benchmark two possible implementations of a stream reader.

The difference lies in the data structure that buffers incoming data:

* ``ByteArrayStreamReader`` uses a ``bytearray``;
* ``BytesDequeStreamReader`` uses a ``deque[bytes]``.

``ByteArrayStreamReader`` is faster for streaming small frames, which is the
standard use case of websockets, likely due to its simple implementation and
to ``bytearray`` being fast at appending data and removing data at the front
(https://hg.python.org/cpython/rev/499a96611baa).

``BytesDequeStreamReader`` is faster for large frames and for bursts, likely
because it copies payloads only once, while ``ByteArrayStreamReader`` copies
them twice.

"""


import collections
import os
import timeit


# Implementations


class ByteArrayStreamReader:
    def __init__(self):
        self.buffer = bytearray()
        self.eof = False

    def readline(self):
        n = 0  # number of bytes to read
        p = 0  # number of bytes without a newline
        while True:
            n = self.buffer.find(b"\n", p) + 1
            if n > 0:
                break
            p = len(self.buffer)
            yield
        r = self.buffer[:n]
        del self.buffer[:n]
        return r

    def readexactly(self, n):
        assert n >= 0
        while len(self.buffer) < n:
            yield
        r = self.buffer[:n]
        del self.buffer[:n]
        return r

    def feed_data(self, data):
        self.buffer += data

    def feed_eof(self):
        self.eof = True

    def at_eof(self):
        return self.eof and not self.buffer


class BytesDequeStreamReader:
    def __init__(self):
        self.buffer = collections.deque()
        self.eof = False

    def readline(self):
        b = []
        while True:
            # Read next chunk
            while True:
                try:
                    c = self.buffer.popleft()
                except IndexError:
                    yield
                else:
                    break
            # Handle chunk
            n = c.find(b"\n") + 1
            if n == len(c):
                # Read exactly enough data
                b.append(c)
                break
            elif n > 0:
                # Read too much data
                b.append(c[:n])
                self.buffer.appendleft(c[n:])
                break
            else:  # n == 0
                # Need to read more data
                b.append(c)
        return b"".join(b)

    def readexactly(self, n):
        if n == 0:
            return b""
        b = []
        while True:
            # Read next chunk
            while True:
                try:
                    c = self.buffer.popleft()
                except IndexError:
                    yield
                else:
                    break
            # Handle chunk
            n -= len(c)
            if n == 0:
                # Read exactly enough data
                b.append(c)
                break
            elif n < 0:
                # Read too much data
                b.append(c[:n])
                self.buffer.appendleft(c[n:])
                break
            else:  # n >= 0
                # Need to read more data
                b.append(c)
        return b"".join(b)

    def feed_data(self, data):
        self.buffer.append(data)

    def feed_eof(self):
        self.eof = True

    def at_eof(self):
        return self.eof and not self.buffer


# Tests


class Protocol:
    def __init__(self, StreamReader):
        self.reader = StreamReader()
        self.events = []
        # Start parser coroutine
        self.parser = self.run_parser()
        next(self.parser)

    def run_parser(self):
        while True:
            frame = yield from self.reader.readexactly(2)
            self.events.append(frame)
            frame = yield from self.reader.readline()
            self.events.append(frame)

    def data_received(self, data):
        self.reader.feed_data(data)
        next(self.parser)  # run parser until more data is needed
        events, self.events = self.events, []
        return events


def run_test(StreamReader):
    proto = Protocol(StreamReader)

    actual = proto.data_received(b"a")
    expected = []
    assert actual == expected, f"{actual} != {expected}"

    actual = proto.data_received(b"b")
    expected = [b"ab"]
    assert actual == expected, f"{actual} != {expected}"

    actual = proto.data_received(b"c")
    expected = []
    assert actual == expected, f"{actual} != {expected}"

    actual = proto.data_received(b"\n")
    expected = [b"c\n"]
    assert actual == expected, f"{actual} != {expected}"

    actual = proto.data_received(b"efghi\njklmn")
    expected = [b"ef", b"ghi\n", b"jk"]
    assert actual == expected, f"{actual} != {expected}"


# Benchmarks


def get_frame_packets(size, packet_size=None):
    if size < 126:
        frame = bytes([138, size])
    elif size < 65536:
        frame = bytes([138, 126]) + bytes(divmod(size, 256))
    else:
        size1, size2 = divmod(size, 65536)
        frame = (
            bytes([138, 127]) + bytes(divmod(size1, 256)) + bytes(divmod(size2, 256))
        )
    frame += os.urandom(size)
    if packet_size is None:
        return [frame]
    else:
        packets = []
        while frame:
            packets.append(frame[:packet_size])
            frame = frame[packet_size:]
        return packets


def benchmark_stream(StreamReader, packets, size, count):
    reader = StreamReader()
    for _ in range(count):
        for packet in packets:
            reader.feed_data(packet)
        yield from reader.readexactly(2)
        if size >= 65536:
            yield from reader.readexactly(4)
        elif size >= 126:
            yield from reader.readexactly(2)
        yield from reader.readexactly(size)
    reader.feed_eof()
    assert reader.at_eof()


def benchmark_burst(StreamReader, packets, size, count):
    reader = StreamReader()
    for _ in range(count):
        for packet in packets:
            reader.feed_data(packet)
    reader.feed_eof()
    for _ in range(count):
        yield from reader.readexactly(2)
        if size >= 65536:
            yield from reader.readexactly(4)
        elif size >= 126:
            yield from reader.readexactly(2)
        yield from reader.readexactly(size)
    assert reader.at_eof()


def run_benchmark(size, count, packet_size=None, number=1000):
    stmt = f"list(benchmark(StreamReader, packets, {size}, {count}))"
    setup = f"packets = get_frame_packets({size}, {packet_size})"
    context = globals()

    context["StreamReader"] = context["ByteArrayStreamReader"]
    context["benchmark"] = context["benchmark_stream"]
    bas = min(timeit.repeat(stmt, setup, number=number, globals=context))
    context["benchmark"] = context["benchmark_burst"]
    bab = min(timeit.repeat(stmt, setup, number=number, globals=context))

    context["StreamReader"] = context["BytesDequeStreamReader"]
    context["benchmark"] = context["benchmark_stream"]
    bds = min(timeit.repeat(stmt, setup, number=number, globals=context))
    context["benchmark"] = context["benchmark_burst"]
    bdb = min(timeit.repeat(stmt, setup, number=number, globals=context))

    print(
        f"Frame size = {size} bytes, "
        f"frame count = {count}, "
        f"packet size = {packet_size}"
    )
    print(f"* ByteArrayStreamReader  (stream): {bas / number * 1_000_000:.1f}µs")
    print(
        f"* BytesDequeStreamReader (stream): "
        f"{bds / number * 1_000_000:.1f}µs ({(bds / bas - 1) * 100:+.1f}%)"
    )
    print(f"* ByteArrayStreamReader  (burst):  {bab / number * 1_000_000:.1f}µs")
    print(
        f"* BytesDequeStreamReader (burst):  "
        f"{bdb / number * 1_000_000:.1f}µs ({(bdb / bab - 1) * 100:+.1f}%)"
    )
    print()


if __name__ == "__main__":
    run_test(ByteArrayStreamReader)
    run_test(BytesDequeStreamReader)

    run_benchmark(size=8, count=1000)
    run_benchmark(size=60, count=1000)
    run_benchmark(size=500, count=500)
    run_benchmark(size=4_000, count=200)
    run_benchmark(size=30_000, count=100)
    run_benchmark(size=250_000, count=50)
    run_benchmark(size=2_000_000, count=20)

    run_benchmark(size=4_000, count=200, packet_size=1024)
    run_benchmark(size=30_000, count=100, packet_size=1024)
    run_benchmark(size=250_000, count=50, packet_size=1024)
    run_benchmark(size=2_000_000, count=20, packet_size=1024)

    run_benchmark(size=30_000, count=100, packet_size=4096)
    run_benchmark(size=250_000, count=50, packet_size=4096)
    run_benchmark(size=2_000_000, count=20, packet_size=4096)

    run_benchmark(size=30_000, count=100, packet_size=16384)
    run_benchmark(size=250_000, count=50, packet_size=16384)
    run_benchmark(size=2_000_000, count=20, packet_size=16384)

    run_benchmark(size=250_000, count=50, packet_size=65536)
    run_benchmark(size=2_000_000, count=20, packet_size=65536)