summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py')
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
new file mode 100644
index 0000000000..ca24a59834
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
@@ -0,0 +1,301 @@
+"""
+Benchmark two possible implementations of a stream reader.
+
+The difference lies in the data structure that buffers incoming data:
+
+* ``ByteArrayStreamReader`` uses a ``bytearray``;
+* ``BytesDequeStreamReader`` uses a ``deque[bytes]``.
+
+``ByteArrayStreamReader`` is faster for streaming small frames, which is the
+standard use case of websockets, likely due to its simple implementation and
+to ``bytearray`` being fast at appending data and removing data at the front
+(https://hg.python.org/cpython/rev/499a96611baa).
+
+``BytesDequeStreamReader`` is faster for large frames and for bursts, likely
+because it copies payloads only once, while ``ByteArrayStreamReader`` copies
+them twice.
+
+"""
+
+
+import collections
+import os
+import timeit
+
+
+# Implementations
+
+
+class ByteArrayStreamReader:
+ def __init__(self):
+ self.buffer = bytearray()
+ self.eof = False
+
+ def readline(self):
+ n = 0 # number of bytes to read
+ p = 0 # number of bytes without a newline
+ while True:
+ n = self.buffer.find(b"\n", p) + 1
+ if n > 0:
+ break
+ p = len(self.buffer)
+ yield
+ r = self.buffer[:n]
+ del self.buffer[:n]
+ return r
+
+ def readexactly(self, n):
+ assert n >= 0
+ while len(self.buffer) < n:
+ yield
+ r = self.buffer[:n]
+ del self.buffer[:n]
+ return r
+
+ def feed_data(self, data):
+ self.buffer += data
+
+ def feed_eof(self):
+ self.eof = True
+
+ def at_eof(self):
+ return self.eof and not self.buffer
+
+
+class BytesDequeStreamReader:
+ def __init__(self):
+ self.buffer = collections.deque()
+ self.eof = False
+
+ def readline(self):
+ b = []
+ while True:
+ # Read next chunk
+ while True:
+ try:
+ c = self.buffer.popleft()
+ except IndexError:
+ yield
+ else:
+ break
+ # Handle chunk
+ n = c.find(b"\n") + 1
+ if n == len(c):
+ # Read exactly enough data
+ b.append(c)
+ break
+ elif n > 0:
+ # Read too much data
+ b.append(c[:n])
+ self.buffer.appendleft(c[n:])
+ break
+ else: # n == 0
+ # Need to read more data
+ b.append(c)
+ return b"".join(b)
+
+ def readexactly(self, n):
+ if n == 0:
+ return b""
+ b = []
+ while True:
+ # Read next chunk
+ while True:
+ try:
+ c = self.buffer.popleft()
+ except IndexError:
+ yield
+ else:
+ break
+ # Handle chunk
+ n -= len(c)
+ if n == 0:
+ # Read exactly enough data
+ b.append(c)
+ break
+ elif n < 0:
+ # Read too much data
+ b.append(c[:n])
+ self.buffer.appendleft(c[n:])
+ break
+ else: # n >= 0
+ # Need to read more data
+ b.append(c)
+ return b"".join(b)
+
+ def feed_data(self, data):
+ self.buffer.append(data)
+
+ def feed_eof(self):
+ self.eof = True
+
+ def at_eof(self):
+ return self.eof and not self.buffer
+
+
+# Tests
+
+
+class Protocol:
+ def __init__(self, StreamReader):
+ self.reader = StreamReader()
+ self.events = []
+ # Start parser coroutine
+ self.parser = self.run_parser()
+ next(self.parser)
+
+ def run_parser(self):
+ while True:
+ frame = yield from self.reader.readexactly(2)
+ self.events.append(frame)
+ frame = yield from self.reader.readline()
+ self.events.append(frame)
+
+ def data_received(self, data):
+ self.reader.feed_data(data)
+ next(self.parser) # run parser until more data is needed
+ events, self.events = self.events, []
+ return events
+
+
+def run_test(StreamReader):
+ proto = Protocol(StreamReader)
+
+ actual = proto.data_received(b"a")
+ expected = []
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"b")
+ expected = [b"ab"]
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"c")
+ expected = []
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"\n")
+ expected = [b"c\n"]
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"efghi\njklmn")
+ expected = [b"ef", b"ghi\n", b"jk"]
+ assert actual == expected, f"{actual} != {expected}"
+
+
+# Benchmarks
+
+
+def get_frame_packets(size, packet_size=None):
+ if size < 126:
+ frame = bytes([138, size])
+ elif size < 65536:
+ frame = bytes([138, 126]) + bytes(divmod(size, 256))
+ else:
+ size1, size2 = divmod(size, 65536)
+ frame = (
+ bytes([138, 127]) + bytes(divmod(size1, 256)) + bytes(divmod(size2, 256))
+ )
+ frame += os.urandom(size)
+ if packet_size is None:
+ return [frame]
+ else:
+ packets = []
+ while frame:
+ packets.append(frame[:packet_size])
+ frame = frame[packet_size:]
+ return packets
+
+
+def benchmark_stream(StreamReader, packets, size, count):
+ reader = StreamReader()
+ for _ in range(count):
+ for packet in packets:
+ reader.feed_data(packet)
+ yield from reader.readexactly(2)
+ if size >= 65536:
+ yield from reader.readexactly(4)
+ elif size >= 126:
+ yield from reader.readexactly(2)
+ yield from reader.readexactly(size)
+ reader.feed_eof()
+ assert reader.at_eof()
+
+
+def benchmark_burst(StreamReader, packets, size, count):
+ reader = StreamReader()
+ for _ in range(count):
+ for packet in packets:
+ reader.feed_data(packet)
+ reader.feed_eof()
+ for _ in range(count):
+ yield from reader.readexactly(2)
+ if size >= 65536:
+ yield from reader.readexactly(4)
+ elif size >= 126:
+ yield from reader.readexactly(2)
+ yield from reader.readexactly(size)
+ assert reader.at_eof()
+
+
+def run_benchmark(size, count, packet_size=None, number=1000):
+ stmt = f"list(benchmark(StreamReader, packets, {size}, {count}))"
+ setup = f"packets = get_frame_packets({size}, {packet_size})"
+ context = globals()
+
+ context["StreamReader"] = context["ByteArrayStreamReader"]
+ context["benchmark"] = context["benchmark_stream"]
+ bas = min(timeit.repeat(stmt, setup, number=number, globals=context))
+ context["benchmark"] = context["benchmark_burst"]
+ bab = min(timeit.repeat(stmt, setup, number=number, globals=context))
+
+ context["StreamReader"] = context["BytesDequeStreamReader"]
+ context["benchmark"] = context["benchmark_stream"]
+ bds = min(timeit.repeat(stmt, setup, number=number, globals=context))
+ context["benchmark"] = context["benchmark_burst"]
+ bdb = min(timeit.repeat(stmt, setup, number=number, globals=context))
+
+ print(
+ f"Frame size = {size} bytes, "
+ f"frame count = {count}, "
+ f"packet size = {packet_size}"
+ )
+ print(f"* ByteArrayStreamReader (stream): {bas / number * 1_000_000:.1f}µs")
+ print(
+ f"* BytesDequeStreamReader (stream): "
+ f"{bds / number * 1_000_000:.1f}µs ({(bds / bas - 1) * 100:+.1f}%)"
+ )
+ print(f"* ByteArrayStreamReader (burst): {bab / number * 1_000_000:.1f}µs")
+ print(
+ f"* BytesDequeStreamReader (burst): "
+ f"{bdb / number * 1_000_000:.1f}µs ({(bdb / bab - 1) * 100:+.1f}%)"
+ )
+ print()
+
+
+if __name__ == "__main__":
+ run_test(ByteArrayStreamReader)
+ run_test(BytesDequeStreamReader)
+
+ run_benchmark(size=8, count=1000)
+ run_benchmark(size=60, count=1000)
+ run_benchmark(size=500, count=500)
+ run_benchmark(size=4_000, count=200)
+ run_benchmark(size=30_000, count=100)
+ run_benchmark(size=250_000, count=50)
+ run_benchmark(size=2_000_000, count=20)
+
+ run_benchmark(size=4_000, count=200, packet_size=1024)
+ run_benchmark(size=30_000, count=100, packet_size=1024)
+ run_benchmark(size=250_000, count=50, packet_size=1024)
+ run_benchmark(size=2_000_000, count=20, packet_size=1024)
+
+ run_benchmark(size=30_000, count=100, packet_size=4096)
+ run_benchmark(size=250_000, count=50, packet_size=4096)
+ run_benchmark(size=2_000_000, count=20, packet_size=4096)
+
+ run_benchmark(size=30_000, count=100, packet_size=16384)
+ run_benchmark(size=250_000, count=50, packet_size=16384)
+ run_benchmark(size=2_000_000, count=20, packet_size=16384)
+
+ run_benchmark(size=250_000, count=50, packet_size=65536)
+ run_benchmark(size=2_000_000, count=20, packet_size=65536)