summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/third_party/websockets/experiments
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/tools/third_party/websockets/experiments')
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/app.py226
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.html15
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.js23
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.html9
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.js9
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.html14
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.js11
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/index.html12
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.html14
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.js11
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/script.js51
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/style.css69
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.html15
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.js6
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.html14
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.js11
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py61
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py153
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/compression/benchmark.py163
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/compression/client.py59
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/compression/server.py70
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_frames.py101
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_handshake.py102
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py301
24 files changed, 1520 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/app.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/app.py
new file mode 100644
index 0000000000..039e21174b
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/app.py
@@ -0,0 +1,226 @@
+#!/usr/bin/env python
+
+import asyncio
+import http
+import http.cookies
+import pathlib
+import signal
+import urllib.parse
+import uuid
+
+import websockets
+from websockets.frames import CloseCode
+
+
+# User accounts database
+
+USERS = {}
+
+
+def create_token(user, lifetime=1):
+ """Create token for user and delete it once its lifetime is over."""
+ token = uuid.uuid4().hex
+ USERS[token] = user
+ asyncio.get_running_loop().call_later(lifetime, USERS.pop, token)
+ return token
+
+
+def get_user(token):
+ """Find user authenticated by token or return None."""
+ return USERS.get(token)
+
+
+# Utilities
+
+
+def get_cookie(raw, key):
+ cookie = http.cookies.SimpleCookie(raw)
+ morsel = cookie.get(key)
+ if morsel is not None:
+ return morsel.value
+
+
+def get_query_param(path, key):
+ query = urllib.parse.urlparse(path).query
+ params = urllib.parse.parse_qs(query)
+ values = params.get(key, [])
+ if len(values) == 1:
+ return values[0]
+
+
+# Main HTTP server
+
+CONTENT_TYPES = {
+ ".css": "text/css",
+ ".html": "text/html; charset=utf-8",
+ ".ico": "image/x-icon",
+ ".js": "text/javascript",
+}
+
+
+async def serve_html(path, request_headers):
+ user = get_query_param(path, "user")
+ path = urllib.parse.urlparse(path).path
+ if path == "/":
+ if user is None:
+ page = "index.html"
+ else:
+ page = "test.html"
+ else:
+ page = path[1:]
+
+ try:
+ template = pathlib.Path(__file__).with_name(page)
+ except ValueError:
+ pass
+ else:
+ if template.is_file():
+ headers = {"Content-Type": CONTENT_TYPES[template.suffix]}
+ body = template.read_bytes()
+ if user is not None:
+ token = create_token(user)
+ body = body.replace(b"TOKEN", token.encode())
+ return http.HTTPStatus.OK, headers, body
+
+ return http.HTTPStatus.NOT_FOUND, {}, b"Not found\n"
+
+
+async def noop_handler(websocket):
+ pass
+
+
+# Send credentials as the first message in the WebSocket connection
+
+
+async def first_message_handler(websocket):
+ token = await websocket.recv()
+ user = get_user(token)
+ if user is None:
+ await websocket.close(CloseCode.INTERNAL_ERROR, "authentication failed")
+ return
+
+ await websocket.send(f"Hello {user}!")
+ message = await websocket.recv()
+ assert message == f"Goodbye {user}."
+
+
+# Add credentials to the WebSocket URI in a query parameter
+
+
+class QueryParamProtocol(websockets.WebSocketServerProtocol):
+ async def process_request(self, path, headers):
+ token = get_query_param(path, "token")
+ if token is None:
+ return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n"
+
+ user = get_user(token)
+ if user is None:
+ return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n"
+
+ self.user = user
+
+
+async def query_param_handler(websocket):
+ user = websocket.user
+
+ await websocket.send(f"Hello {user}!")
+ message = await websocket.recv()
+ assert message == f"Goodbye {user}."
+
+
+# Set a cookie on the domain of the WebSocket URI
+
+
+class CookieProtocol(websockets.WebSocketServerProtocol):
+ async def process_request(self, path, headers):
+ if "Upgrade" not in headers:
+ template = pathlib.Path(__file__).with_name(path[1:])
+ headers = {"Content-Type": CONTENT_TYPES[template.suffix]}
+ body = template.read_bytes()
+ return http.HTTPStatus.OK, headers, body
+
+ token = get_cookie(headers.get("Cookie", ""), "token")
+ if token is None:
+ return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n"
+
+ user = get_user(token)
+ if user is None:
+ return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n"
+
+ self.user = user
+
+
+async def cookie_handler(websocket):
+ user = websocket.user
+
+ await websocket.send(f"Hello {user}!")
+ message = await websocket.recv()
+ assert message == f"Goodbye {user}."
+
+
+# Adding credentials to the WebSocket URI in user information
+
+
+class UserInfoProtocol(websockets.BasicAuthWebSocketServerProtocol):
+ async def check_credentials(self, username, password):
+ if username != "token":
+ return False
+
+ user = get_user(password)
+ if user is None:
+ return False
+
+ self.user = user
+ return True
+
+
+async def user_info_handler(websocket):
+ user = websocket.user
+
+ await websocket.send(f"Hello {user}!")
+ message = await websocket.recv()
+ assert message == f"Goodbye {user}."
+
+
+# Start all five servers
+
+
+async def main():
+ # Set the stop condition when receiving SIGINT or SIGTERM.
+ loop = asyncio.get_running_loop()
+ stop = loop.create_future()
+ loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
+ loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
+
+ async with websockets.serve(
+ noop_handler,
+ host="",
+ port=8000,
+ process_request=serve_html,
+ ), websockets.serve(
+ first_message_handler,
+ host="",
+ port=8001,
+ ), websockets.serve(
+ query_param_handler,
+ host="",
+ port=8002,
+ create_protocol=QueryParamProtocol,
+ ), websockets.serve(
+ cookie_handler,
+ host="",
+ port=8003,
+ create_protocol=CookieProtocol,
+ ), websockets.serve(
+ user_info_handler,
+ host="",
+ port=8004,
+ create_protocol=UserInfoProtocol,
+ ):
+ print("Running on http://localhost:8000/")
+ await stop
+ print("\rExiting")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.html
new file mode 100644
index 0000000000..ca17358fd0
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>Cookie | WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body class="test">
+ <p class="test">[??] Cookie</p>
+ <p class="ok">[OK] Cookie</p>
+ <p class="ko">[KO] Cookie</p>
+ <script src="script.js"></script>
+ <script src="cookie.js"></script>
+ <iframe src="http://localhost:8003/cookie_iframe.html" style="display: none;"></iframe>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.js
new file mode 100644
index 0000000000..2cca34fcbb
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie.js
@@ -0,0 +1,23 @@
+// send token to iframe
+window.addEventListener("DOMContentLoaded", () => {
+ const iframe = document.querySelector("iframe");
+ iframe.addEventListener("load", () => {
+ iframe.contentWindow.postMessage(token, "http://localhost:8003");
+ });
+});
+
+// once iframe has set cookie, open WebSocket connection
+window.addEventListener("message", ({ origin }) => {
+ if (origin !== "http://localhost:8003") {
+ return;
+ }
+
+ const websocket = new WebSocket("ws://localhost:8003/");
+
+ websocket.onmessage = ({ data }) => {
+ // event.data is expected to be "Hello <user>!"
+ websocket.send(`Goodbye ${data.slice(6, -1)}.`);
+ };
+
+ runTest(websocket);
+});
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.html
new file mode 100644
index 0000000000..9f49ebb9a0
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.html
@@ -0,0 +1,9 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>Cookie iframe | WebSocket Authentication</title>
+ </head>
+ <body>
+ <script src="cookie_iframe.js"></script>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.js
new file mode 100644
index 0000000000..2d2e692e8d
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/cookie_iframe.js
@@ -0,0 +1,9 @@
+// receive token from the parent window, set cookie and notify parent
+window.addEventListener("message", ({ origin, data }) => {
+ if (origin !== "http://localhost:8000") {
+ return;
+ }
+
+ document.cookie = `token=${data}; SameSite=Strict`;
+ window.parent.postMessage("", "http://localhost:8000");
+});
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.html
new file mode 100644
index 0000000000..4dc511a176
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>First message | WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body class="test">
+ <p class="test">[??] First message</p>
+ <p class="ok">[OK] First message</p>
+ <p class="ko">[KO] First message</p>
+ <script src="script.js"></script>
+ <script src="first_message.js"></script>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.js
new file mode 100644
index 0000000000..1acf048baf
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/first_message.js
@@ -0,0 +1,11 @@
+window.addEventListener("DOMContentLoaded", () => {
+ const websocket = new WebSocket("ws://localhost:8001/");
+ websocket.onopen = () => websocket.send(token);
+
+ websocket.onmessage = ({ data }) => {
+ // event.data is expected to be "Hello <user>!"
+ websocket.send(`Goodbye ${data.slice(6, -1)}.`);
+ };
+
+ runTest(websocket);
+});
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/index.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/index.html
new file mode 100644
index 0000000000..c37deef270
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/index.html
@@ -0,0 +1,12 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body>
+ <form method="GET">
+ <input name="user" placeholder="username">
+ </form>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.html
new file mode 100644
index 0000000000..27aa454a40
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>Query parameter | WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body class="test">
+ <p class="test">[??] Query parameter</p>
+ <p class="ok">[OK] Query parameter</p>
+ <p class="ko">[KO] Query parameter</p>
+ <script src="script.js"></script>
+ <script src="query_param.js"></script>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.js
new file mode 100644
index 0000000000..6a54d0b6ca
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/query_param.js
@@ -0,0 +1,11 @@
+window.addEventListener("DOMContentLoaded", () => {
+ const uri = `ws://localhost:8002/?token=${token}`;
+ const websocket = new WebSocket(uri);
+
+ websocket.onmessage = ({ data }) => {
+ // event.data is expected to be "Hello <user>!"
+ websocket.send(`Goodbye ${data.slice(6, -1)}.`);
+ };
+
+ runTest(websocket);
+});
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/script.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/script.js
new file mode 100644
index 0000000000..ec4e5e6709
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/script.js
@@ -0,0 +1,51 @@
+var token = window.parent.token;
+
+function getExpectedEvents() {
+ return [
+ {
+ type: "open",
+ },
+ {
+ type: "message",
+ data: `Hello ${window.parent.user}!`,
+ },
+ {
+ type: "close",
+ code: 1000,
+ reason: "",
+ wasClean: true,
+ },
+ ];
+}
+
+function isEqual(expected, actual) {
+ // good enough for our purposes here!
+ return JSON.stringify(expected) === JSON.stringify(actual);
+}
+
+function testStep(expected, actual) {
+ if (isEqual(expected, actual)) {
+ document.body.className = "ok";
+ } else if (isEqual(expected.slice(0, actual.length), actual)) {
+ document.body.className = "test";
+ } else {
+ document.body.className = "ko";
+ }
+}
+
+function runTest(websocket) {
+ const expected = getExpectedEvents();
+ var actual = [];
+ websocket.addEventListener("open", ({ type }) => {
+ actual.push({ type });
+ testStep(expected, actual);
+ });
+ websocket.addEventListener("message", ({ type, data }) => {
+ actual.push({ type, data });
+ testStep(expected, actual);
+ });
+ websocket.addEventListener("close", ({ type, code, reason, wasClean }) => {
+ actual.push({ type, code, reason, wasClean });
+ testStep(expected, actual);
+ });
+}
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/style.css b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/style.css
new file mode 100644
index 0000000000..6e3918ccae
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/style.css
@@ -0,0 +1,69 @@
+/* page layout */
+
+body {
+ display: flex;
+ flex-direction: column;
+ justify-content: center;
+ align-items: center;
+ margin: 0;
+ height: 100vh;
+}
+div.title, iframe {
+ width: 100vw;
+ height: 20vh;
+ border: none;
+}
+div.title {
+ display: flex;
+ flex-direction: column;
+ justify-content: center;
+ align-items: center;
+}
+h1, p {
+ margin: 0;
+ width: 24em;
+}
+
+/* text style */
+
+h1, input, p {
+ font-family: monospace;
+ font-size: 3em;
+}
+input {
+ color: #333;
+ border: 3px solid #999;
+ padding: 1em;
+}
+input:focus {
+ border-color: #333;
+ outline: none;
+}
+input::placeholder {
+ color: #999;
+ opacity: 1;
+}
+
+/* test results */
+
+body.test {
+ background-color: #666;
+ color: #fff;
+}
+body.ok {
+ background-color: #090;
+ color: #fff;
+}
+body.ko {
+ background-color: #900;
+ color: #fff;
+}
+body > p {
+ display: none;
+}
+body > p.title,
+body.test > p.test,
+body.ok > p.ok,
+body.ko > p.ko {
+ display: block;
+}
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.html
new file mode 100644
index 0000000000..3883d6a39e
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body data-token="TOKEN">
+ <div class="title"><h1>WebSocket Authentication</h1></div>
+ <iframe src="first_message.html"></iframe>
+ <iframe src="query_param.html"></iframe>
+ <iframe src="cookie.html"></iframe>
+ <iframe src="user_info.html"></iframe>
+ <script src="test.js"></script>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.js
new file mode 100644
index 0000000000..428830ff31
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/test.js
@@ -0,0 +1,6 @@
+// for connecting to WebSocket servers
+var token = document.body.dataset.token;
+
+// for test assertions only
+const params = new URLSearchParams(window.location.search);
+var user = params.get("user");
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.html b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.html
new file mode 100644
index 0000000000..7b9c99c730
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>User information | WebSocket Authentication</title>
+ <link href="style.css" rel="stylesheet">
+ </head>
+ <body class="test">
+ <p class="test">[??] User information</p>
+ <p class="ok">[OK] User information</p>
+ <p class="ko">[KO] User information</p>
+ <script src="script.js"></script>
+ <script src="user_info.js"></script>
+ </body>
+</html>
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.js b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.js
new file mode 100644
index 0000000000..1dab2ce4c1
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/authentication/user_info.js
@@ -0,0 +1,11 @@
+window.addEventListener("DOMContentLoaded", () => {
+ const uri = `ws://token:${token}@localhost:8004/`;
+ const websocket = new WebSocket(uri);
+
+ websocket.onmessage = ({ data }) => {
+ // event.data is expected to be "Hello <user>!"
+ websocket.send(`Goodbye ${data.slice(6, -1)}.`);
+ };
+
+ runTest(websocket);
+});
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py
new file mode 100644
index 0000000000..fe39dfe051
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+import asyncio
+import statistics
+import sys
+import time
+
+import websockets
+
+
+LATENCIES = {}
+
+
+async def log_latency(interval):
+ while True:
+ await asyncio.sleep(interval)
+ p = statistics.quantiles(LATENCIES.values(), n=100)
+ print(f"clients = {len(LATENCIES)}")
+ print(
+ f"p50 = {p[49] / 1e6:.1f}ms, "
+ f"p95 = {p[94] / 1e6:.1f}ms, "
+ f"p99 = {p[98] / 1e6:.1f}ms"
+ )
+ print()
+
+
+async def client():
+ try:
+ async with websockets.connect(
+ "ws://localhost:8765",
+ ping_timeout=None,
+ ) as websocket:
+ async for msg in websocket:
+ client_time = time.time_ns()
+ server_time = int(msg[:19].decode())
+ LATENCIES[websocket] = client_time - server_time
+ except Exception as exc:
+ print(exc)
+
+
+async def main(count, interval):
+ asyncio.create_task(log_latency(interval))
+ clients = []
+ for _ in range(count):
+ clients.append(asyncio.create_task(client()))
+ await asyncio.sleep(0.001) # 1ms between each connection
+ await asyncio.wait(clients)
+
+
+if __name__ == "__main__":
+ try:
+ count = int(sys.argv[1])
+ interval = float(sys.argv[2])
+ except Exception as exc:
+ print(f"Usage: {sys.argv[0]} count interval")
+ print(" Connect <count> clients e.g. 1000")
+ print(" Report latency every <interval> seconds e.g. 1")
+ print()
+ print(exc)
+ else:
+ asyncio.run(main(count, interval))
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py
new file mode 100644
index 0000000000..9c9907b7f9
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+
+import asyncio
+import functools
+import os
+import sys
+import time
+
+import websockets
+
+
+CLIENTS = set()
+
+
+async def send(websocket, message):
+ try:
+ await websocket.send(message)
+ except websockets.ConnectionClosed:
+ pass
+
+
+async def relay(queue, websocket):
+ while True:
+ message = await queue.get()
+ await websocket.send(message)
+
+
+class PubSub:
+ def __init__(self):
+ self.waiter = asyncio.Future()
+
+ def publish(self, value):
+ waiter, self.waiter = self.waiter, asyncio.Future()
+ waiter.set_result((value, self.waiter))
+
+ async def subscribe(self):
+ waiter = self.waiter
+ while True:
+ value, waiter = await waiter
+ yield value
+
+ __aiter__ = subscribe
+
+
+PUBSUB = PubSub()
+
+
+async def handler(websocket, method=None):
+ if method in ["default", "naive", "task", "wait"]:
+ CLIENTS.add(websocket)
+ try:
+ await websocket.wait_closed()
+ finally:
+ CLIENTS.remove(websocket)
+ elif method == "queue":
+ queue = asyncio.Queue()
+ relay_task = asyncio.create_task(relay(queue, websocket))
+ CLIENTS.add(queue)
+ try:
+ await websocket.wait_closed()
+ finally:
+ CLIENTS.remove(queue)
+ relay_task.cancel()
+ elif method == "pubsub":
+ async for message in PUBSUB:
+ await websocket.send(message)
+ else:
+ raise NotImplementedError(f"unsupported method: {method}")
+
+
+async def broadcast(method, size, delay):
+ """Broadcast messages at regular intervals."""
+ load_average = 0
+ time_average = 0
+ pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
+ await asyncio.sleep(delay)
+ while True:
+ print(f"clients = {len(CLIENTS)}")
+ pc0, pt0 = time.perf_counter_ns(), time.process_time_ns()
+ load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1)
+ print(
+ f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / "
+ f"average = {load_average * 100:.1f}%, "
+ f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms"
+ )
+ pc1, pt1 = pc0, pt0
+
+ assert size > 20
+ message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)
+
+ if method == "default":
+ websockets.broadcast(CLIENTS, message)
+ elif method == "naive":
+ # Since the loop can yield control, make a copy of CLIENTS
+ # to avoid: RuntimeError: Set changed size during iteration
+ for websocket in CLIENTS.copy():
+ await send(websocket, message)
+ elif method == "task":
+ for websocket in CLIENTS:
+ asyncio.create_task(send(websocket, message))
+ elif method == "wait":
+ if CLIENTS: # asyncio.wait doesn't accept an empty list
+ await asyncio.wait(
+ [
+ asyncio.create_task(send(websocket, message))
+ for websocket in CLIENTS
+ ]
+ )
+ elif method == "queue":
+ for queue in CLIENTS:
+ queue.put_nowait(message)
+ elif method == "pubsub":
+ PUBSUB.publish(message)
+ else:
+ raise NotImplementedError(f"unsupported method: {method}")
+
+ pc2 = time.perf_counter_ns()
+ wait = delay + (pc1 - pc2) / 1e9
+ time_average = 0.9 * time_average + 0.1 * (pc2 - pc1)
+ print(
+ f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / "
+ f"average = {time_average / 1e6:.1f}ms, "
+ f"wait = {wait * 1e3:.1f}ms"
+ )
+ await asyncio.sleep(wait)
+ print()
+
+
+async def main(method, size, delay):
+ async with websockets.serve(
+ functools.partial(handler, method=method),
+ "localhost",
+ 8765,
+ compression=None,
+ ping_timeout=None,
+ ):
+ await broadcast(method, size, delay)
+
+
+if __name__ == "__main__":
+ try:
+ method = sys.argv[1]
+ assert method in ["default", "naive", "task", "wait", "queue", "pubsub"]
+ size = int(sys.argv[2])
+ delay = float(sys.argv[3])
+ except Exception as exc:
+ print(f"Usage: {sys.argv[0]} method size delay")
+ print(" Start a server broadcasting messages with <method> e.g. naive")
+ print(" Send a payload of <size> bytes every <delay> seconds")
+ print()
+ print(exc)
+ else:
+ asyncio.run(main(method, size, delay))
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/benchmark.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/benchmark.py
new file mode 100644
index 0000000000..c5b13c8fa3
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/benchmark.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+
+import getpass
+import json
+import pickle
+import subprocess
+import sys
+import time
+import zlib
+
+
+CORPUS_FILE = "corpus.pkl"
+
+REPEAT = 10
+
+WB, ML = 12, 5 # defaults used as a reference
+
+
+def _corpus():
+ OAUTH_TOKEN = getpass.getpass("OAuth Token? ")
+ COMMIT_API = (
+ f'curl -H "Authorization: token {OAUTH_TOKEN}" '
+ f"https://api.github.com/repos/python-websockets/websockets/git/commits/:sha"
+ )
+
+ commits = []
+
+ head = subprocess.check_output("git rev-parse HEAD", shell=True).decode().strip()
+ todo = [head]
+ seen = set()
+
+ while todo:
+ sha = todo.pop(0)
+ commit = subprocess.check_output(COMMIT_API.replace(":sha", sha), shell=True)
+ commits.append(commit)
+ seen.add(sha)
+ for parent in json.loads(commit)["parents"]:
+ sha = parent["sha"]
+ if sha not in seen and sha not in todo:
+ todo.append(sha)
+ time.sleep(1) # rate throttling
+
+ return commits
+
+
+def corpus():
+ data = _corpus()
+ with open(CORPUS_FILE, "wb") as handle:
+ pickle.dump(data, handle)
+
+
+def _run(data):
+ size = {}
+ duration = {}
+
+ for wbits in range(9, 16):
+ size[wbits] = {}
+ duration[wbits] = {}
+
+ for memLevel in range(1, 10):
+ encoder = zlib.compressobj(wbits=-wbits, memLevel=memLevel)
+ encoded = []
+
+ t0 = time.perf_counter()
+
+ for _ in range(REPEAT):
+ for item in data:
+ if isinstance(item, str):
+ item = item.encode("utf-8")
+ # Taken from PerMessageDeflate.encode
+ item = encoder.compress(item) + encoder.flush(zlib.Z_SYNC_FLUSH)
+ if item.endswith(b"\x00\x00\xff\xff"):
+ item = item[:-4]
+ encoded.append(item)
+
+ t1 = time.perf_counter()
+
+ size[wbits][memLevel] = sum(len(item) for item in encoded)
+ duration[wbits][memLevel] = (t1 - t0) / REPEAT
+
+ raw_size = sum(len(item) for item in data)
+
+ print("=" * 79)
+ print("Compression ratio")
+ print("=" * 79)
+ print("\t".join(["wb \\ ml"] + [str(memLevel) for memLevel in range(1, 10)]))
+ for wbits in range(9, 16):
+ print(
+ "\t".join(
+ [str(wbits)]
+ + [
+ f"{100 * (1 - size[wbits][memLevel] / raw_size):.1f}%"
+ for memLevel in range(1, 10)
+ ]
+ )
+ )
+ print("=" * 79)
+ print()
+
+ print("=" * 79)
+ print("CPU time")
+ print("=" * 79)
+ print("\t".join(["wb \\ ml"] + [str(memLevel) for memLevel in range(1, 10)]))
+ for wbits in range(9, 16):
+ print(
+ "\t".join(
+ [str(wbits)]
+ + [
+ f"{1000 * duration[wbits][memLevel]:.1f}ms"
+ for memLevel in range(1, 10)
+ ]
+ )
+ )
+ print("=" * 79)
+ print()
+
+ print("=" * 79)
+ print(f"Size vs. {WB} \\ {ML}")
+ print("=" * 79)
+ print("\t".join(["wb \\ ml"] + [str(memLevel) for memLevel in range(1, 10)]))
+ for wbits in range(9, 16):
+ print(
+ "\t".join(
+ [str(wbits)]
+ + [
+ f"{100 * (size[wbits][memLevel] / size[WB][ML] - 1):.1f}%"
+ for memLevel in range(1, 10)
+ ]
+ )
+ )
+ print("=" * 79)
+ print()
+
+ print("=" * 79)
+ print(f"Time vs. {WB} \\ {ML}")
+ print("=" * 79)
+ print("\t".join(["wb \\ ml"] + [str(memLevel) for memLevel in range(1, 10)]))
+ for wbits in range(9, 16):
+ print(
+ "\t".join(
+ [str(wbits)]
+ + [
+ f"{100 * (duration[wbits][memLevel] / duration[WB][ML] - 1):.1f}%"
+ for memLevel in range(1, 10)
+ ]
+ )
+ )
+ print("=" * 79)
+ print()
+
+
+def run():
+ with open(CORPUS_FILE, "rb") as handle:
+ data = pickle.load(handle)
+ _run(data)
+
+
+try:
+ run = globals()[sys.argv[1]]
+except (KeyError, IndexError):
+ print(f"Usage: {sys.argv[0]} [corpus|run]")
+else:
+ run()
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/client.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/client.py
new file mode 100644
index 0000000000..3ee19ddc59
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/client.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+
+import asyncio
+import statistics
+import tracemalloc
+
+import websockets
+from websockets.extensions import permessage_deflate
+
+
+CLIENTS = 20
+INTERVAL = 1 / 10 # seconds
+
+WB, ML = 12, 5
+
+MEM_SIZE = []
+
+
+async def client(client):
+ # Space out connections to make them sequential.
+ await asyncio.sleep(client * INTERVAL)
+
+ tracemalloc.start()
+
+ async with websockets.connect(
+ "ws://localhost:8765",
+ extensions=[
+ permessage_deflate.ClientPerMessageDeflateFactory(
+ server_max_window_bits=WB,
+ client_max_window_bits=WB,
+ compress_settings={"memLevel": ML},
+ )
+ ],
+ ) as ws:
+ await ws.send("hello")
+ await ws.recv()
+
+ await ws.send(b"hello")
+ await ws.recv()
+
+ MEM_SIZE.append(tracemalloc.get_traced_memory()[0])
+ tracemalloc.stop()
+
+ # Hold connection open until the end of the test.
+ await asyncio.sleep(CLIENTS * INTERVAL)
+
+
+async def clients():
+ await asyncio.gather(*[client(client) for client in range(CLIENTS + 1)])
+
+
+asyncio.run(clients())
+
+
+# First connection incurs non-representative setup costs.
+del MEM_SIZE[0]
+
+print(f"µ = {statistics.mean(MEM_SIZE) / 1024:.1f} KiB")
+print(f"σ = {statistics.stdev(MEM_SIZE) / 1024:.1f} KiB")
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/server.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/server.py
new file mode 100644
index 0000000000..8d1ee3cd7c
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/compression/server.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+import asyncio
+import os
+import signal
+import statistics
+import tracemalloc
+
+import websockets
+from websockets.extensions import permessage_deflate
+
+
+CLIENTS = 20
+INTERVAL = 1 / 10 # seconds
+
+WB, ML = 12, 5
+
+MEM_SIZE = []
+
+
+async def handler(ws):
+ msg = await ws.recv()
+ await ws.send(msg)
+
+ msg = await ws.recv()
+ await ws.send(msg)
+
+ MEM_SIZE.append(tracemalloc.get_traced_memory()[0])
+ tracemalloc.stop()
+
+ tracemalloc.start()
+
+ # Hold connection open until the end of the test.
+ await asyncio.sleep(CLIENTS * INTERVAL)
+
+
+async def server():
+ loop = asyncio.get_running_loop()
+ stop = loop.create_future()
+
+ # Set the stop condition when receiving SIGTERM.
+ print("Stop the server with:")
+ print(f"kill -TERM {os.getpid()}")
+ print()
+ loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
+
+ async with websockets.serve(
+ handler,
+ "localhost",
+ 8765,
+ extensions=[
+ permessage_deflate.ServerPerMessageDeflateFactory(
+ server_max_window_bits=WB,
+ client_max_window_bits=WB,
+ compress_settings={"memLevel": ML},
+ )
+ ],
+ ):
+ tracemalloc.start()
+ await stop
+
+
+asyncio.run(server())
+
+
+# First connection may incur non-representative setup costs.
+del MEM_SIZE[0]
+
+print(f"µ = {statistics.mean(MEM_SIZE) / 1024:.1f} KiB")
+print(f"σ = {statistics.stdev(MEM_SIZE) / 1024:.1f} KiB")
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_frames.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_frames.py
new file mode 100644
index 0000000000..e3acbe3c20
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_frames.py
@@ -0,0 +1,101 @@
+"""Benchark parsing WebSocket frames."""
+
+import subprocess
+import sys
+import timeit
+
+from websockets.extensions.permessage_deflate import PerMessageDeflate
+from websockets.frames import Frame, Opcode
+from websockets.streams import StreamReader
+
+
+# 256kB of text, compressible by about 70%.
+text = subprocess.check_output(["git", "log", "8dd8e410"], text=True)
+
+
+def get_frame(size):
+ repeat, remainder = divmod(size, 256 * 1024)
+ payload = repeat * text + text[:remainder]
+ return Frame(Opcode.TEXT, payload.encode(), True)
+
+
+def parse_frame(data, count, mask, extensions):
+ reader = StreamReader()
+ for _ in range(count):
+ reader.feed_data(data)
+ parser = Frame.parse(
+ reader.read_exact,
+ mask=mask,
+ extensions=extensions,
+ )
+ try:
+ next(parser)
+ except StopIteration:
+ pass
+ else:
+ assert False, "parser should return frame"
+ reader.feed_eof()
+ assert reader.at_eof(), "parser should consume all data"
+
+
+def run_benchmark(size, count, compression=False, number=100):
+ if compression:
+ extensions = [PerMessageDeflate(True, True, 12, 12, {"memLevel": 5})]
+ else:
+ extensions = []
+ globals = {
+ "get_frame": get_frame,
+ "parse_frame": parse_frame,
+ "extensions": extensions,
+ }
+ sppf = (
+ min(
+ timeit.repeat(
+ f"parse_frame(data, {count}, mask=True, extensions=extensions)",
+ f"data = get_frame({size})"
+ f".serialize(mask=True, extensions=extensions)",
+ number=number,
+ globals=globals,
+ )
+ )
+ / number
+ / count
+ * 1_000_000
+ )
+ cppf = (
+ min(
+ timeit.repeat(
+ f"parse_frame(data, {count}, mask=False, extensions=extensions)",
+ f"data = get_frame({size})"
+ f".serialize(mask=False, extensions=extensions)",
+ number=number,
+ globals=globals,
+ )
+ )
+ / number
+ / count
+ * 1_000_000
+ )
+ print(f"{size}\t{compression}\t{sppf:.2f}\t{cppf:.2f}")
+
+
+if __name__ == "__main__":
+ print("Sizes are in bytes. Times are in µs per frame.", file=sys.stderr)
+ print("Run `tabs -16` for clean output. Pipe stdout to TSV for saving.")
+ print(file=sys.stderr)
+
+ print("size\tcompression\tserver\tclient")
+ run_benchmark(size=8, count=1000, compression=False)
+ run_benchmark(size=60, count=1000, compression=False)
+ run_benchmark(size=500, count=1000, compression=False)
+ run_benchmark(size=4_000, count=1000, compression=False)
+ run_benchmark(size=30_000, count=200, compression=False)
+ run_benchmark(size=250_000, count=100, compression=False)
+ run_benchmark(size=2_000_000, count=20, compression=False)
+
+ run_benchmark(size=8, count=1000, compression=True)
+ run_benchmark(size=60, count=1000, compression=True)
+ run_benchmark(size=500, count=200, compression=True)
+ run_benchmark(size=4_000, count=100, compression=True)
+ run_benchmark(size=30_000, count=20, compression=True)
+ run_benchmark(size=250_000, count=10, compression=True)
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_handshake.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_handshake.py
new file mode 100644
index 0000000000..af5a4ecae2
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/parse_handshake.py
@@ -0,0 +1,102 @@
+"""Benchark parsing WebSocket handshake requests."""
+
+# The parser for responses is designed similarly and should perform similarly.
+
+import sys
+import timeit
+
+from websockets.http11 import Request
+from websockets.streams import StreamReader
+
+
+CHROME_HANDSHAKE = (
+ b"GET / HTTP/1.1\r\n"
+ b"Host: localhost:5678\r\n"
+ b"Connection: Upgrade\r\n"
+ b"Pragma: no-cache\r\n"
+ b"Cache-Control: no-cache\r\n"
+ b"User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ b"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36\r\n"
+ b"Upgrade: websocket\r\n"
+ b"Origin: null\r\n"
+ b"Sec-WebSocket-Version: 13\r\n"
+ b"Accept-Encoding: gzip, deflate, br\r\n"
+ b"Accept-Language: en-GB,en;q=0.9,en-US;q=0.8,fr;q=0.7\r\n"
+ b"Sec-WebSocket-Key: ebkySAl+8+e6l5pRKTMkyQ==\r\n"
+ b"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n"
+ b"\r\n"
+)
+
+FIREFOX_HANDSHAKE = (
+ b"GET / HTTP/1.1\r\n"
+ b"Host: localhost:5678\r\n"
+ b"User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) "
+ b"Gecko/20100101 Firefox/111.0\r\n"
+ b"Accept: */*\r\n"
+ b"Accept-Language: en-US,en;q=0.7,fr-FR;q=0.3\r\n"
+ b"Accept-Encoding: gzip, deflate, br\r\n"
+ b"Sec-WebSocket-Version: 13\r\n"
+ b"Origin: null\r\n"
+ b"Sec-WebSocket-Extensions: permessage-deflate\r\n"
+ b"Sec-WebSocket-Key: 1PuS+hnb+0AXsL7z2hNAhw==\r\n"
+ b"Connection: keep-alive, Upgrade\r\n"
+ b"Sec-Fetch-Dest: websocket\r\n"
+ b"Sec-Fetch-Mode: websocket\r\n"
+ b"Sec-Fetch-Site: cross-site\r\n"
+ b"Pragma: no-cache\r\n"
+ b"Cache-Control: no-cache\r\n"
+ b"Upgrade: websocket\r\n"
+ b"\r\n"
+)
+
+WEBSOCKETS_HANDSHAKE = (
+ b"GET / HTTP/1.1\r\n"
+ b"Host: localhost:8765\r\n"
+ b"Upgrade: websocket\r\n"
+ b"Connection: Upgrade\r\n"
+ b"Sec-WebSocket-Key: 9c55e0/siQ6tJPCs/QR8ZA==\r\n"
+ b"Sec-WebSocket-Version: 13\r\n"
+ b"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n"
+ b"User-Agent: Python/3.11 websockets/11.0\r\n"
+ b"\r\n"
+)
+
+
+def parse_handshake(handshake):
+ reader = StreamReader()
+ reader.feed_data(handshake)
+ parser = Request.parse(reader.read_line)
+ try:
+ next(parser)
+ except StopIteration:
+ pass
+ else:
+ assert False, "parser should return request"
+ reader.feed_eof()
+ assert reader.at_eof(), "parser should consume all data"
+
+
+def run_benchmark(name, handshake, number=10000):
+ ph = (
+ min(
+ timeit.repeat(
+ "parse_handshake(handshake)",
+ number=number,
+ globals={"parse_handshake": parse_handshake, "handshake": handshake},
+ )
+ )
+ / number
+ * 1_000_000
+ )
+ print(f"{name}\t{len(handshake)}\t{ph:.1f}")
+
+
+if __name__ == "__main__":
+ print("Sizes are in bytes. Times are in µs per frame.", file=sys.stderr)
+ print("Run `tabs -16` for clean output. Pipe stdout to TSV for saving.")
+ print(file=sys.stderr)
+
+ print("client\tsize\ttime")
+ run_benchmark("Chrome", CHROME_HANDSHAKE)
+ run_benchmark("Firefox", FIREFOX_HANDSHAKE)
+ run_benchmark("websockets", WEBSOCKETS_HANDSHAKE)
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
new file mode 100644
index 0000000000..ca24a59834
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/optimization/streams.py
@@ -0,0 +1,301 @@
+"""
+Benchmark two possible implementations of a stream reader.
+
+The difference lies in the data structure that buffers incoming data:
+
+* ``ByteArrayStreamReader`` uses a ``bytearray``;
+* ``BytesDequeStreamReader`` uses a ``deque[bytes]``.
+
+``ByteArrayStreamReader`` is faster for streaming small frames, which is the
+standard use case of websockets, likely due to its simple implementation and
+to ``bytearray`` being fast at appending data and removing data at the front
+(https://hg.python.org/cpython/rev/499a96611baa).
+
+``BytesDequeStreamReader`` is faster for large frames and for bursts, likely
+because it copies payloads only once, while ``ByteArrayStreamReader`` copies
+them twice.
+
+"""
+
+
+import collections
+import os
+import timeit
+
+
+# Implementations
+
+
+class ByteArrayStreamReader:
+ def __init__(self):
+ self.buffer = bytearray()
+ self.eof = False
+
+ def readline(self):
+ n = 0 # number of bytes to read
+ p = 0 # number of bytes without a newline
+ while True:
+ n = self.buffer.find(b"\n", p) + 1
+ if n > 0:
+ break
+ p = len(self.buffer)
+ yield
+ r = self.buffer[:n]
+ del self.buffer[:n]
+ return r
+
+ def readexactly(self, n):
+ assert n >= 0
+ while len(self.buffer) < n:
+ yield
+ r = self.buffer[:n]
+ del self.buffer[:n]
+ return r
+
+ def feed_data(self, data):
+ self.buffer += data
+
+ def feed_eof(self):
+ self.eof = True
+
+ def at_eof(self):
+ return self.eof and not self.buffer
+
+
+class BytesDequeStreamReader:
+ def __init__(self):
+ self.buffer = collections.deque()
+ self.eof = False
+
+ def readline(self):
+ b = []
+ while True:
+ # Read next chunk
+ while True:
+ try:
+ c = self.buffer.popleft()
+ except IndexError:
+ yield
+ else:
+ break
+ # Handle chunk
+ n = c.find(b"\n") + 1
+ if n == len(c):
+ # Read exactly enough data
+ b.append(c)
+ break
+ elif n > 0:
+ # Read too much data
+ b.append(c[:n])
+ self.buffer.appendleft(c[n:])
+ break
+ else: # n == 0
+ # Need to read more data
+ b.append(c)
+ return b"".join(b)
+
+ def readexactly(self, n):
+ if n == 0:
+ return b""
+ b = []
+ while True:
+ # Read next chunk
+ while True:
+ try:
+ c = self.buffer.popleft()
+ except IndexError:
+ yield
+ else:
+ break
+ # Handle chunk
+ n -= len(c)
+ if n == 0:
+ # Read exactly enough data
+ b.append(c)
+ break
+ elif n < 0:
+ # Read too much data
+ b.append(c[:n])
+ self.buffer.appendleft(c[n:])
+ break
+ else: # n >= 0
+ # Need to read more data
+ b.append(c)
+ return b"".join(b)
+
+ def feed_data(self, data):
+ self.buffer.append(data)
+
+ def feed_eof(self):
+ self.eof = True
+
+ def at_eof(self):
+ return self.eof and not self.buffer
+
+
+# Tests
+
+
+class Protocol:
+ def __init__(self, StreamReader):
+ self.reader = StreamReader()
+ self.events = []
+ # Start parser coroutine
+ self.parser = self.run_parser()
+ next(self.parser)
+
+ def run_parser(self):
+ while True:
+ frame = yield from self.reader.readexactly(2)
+ self.events.append(frame)
+ frame = yield from self.reader.readline()
+ self.events.append(frame)
+
+ def data_received(self, data):
+ self.reader.feed_data(data)
+ next(self.parser) # run parser until more data is needed
+ events, self.events = self.events, []
+ return events
+
+
+def run_test(StreamReader):
+ proto = Protocol(StreamReader)
+
+ actual = proto.data_received(b"a")
+ expected = []
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"b")
+ expected = [b"ab"]
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"c")
+ expected = []
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"\n")
+ expected = [b"c\n"]
+ assert actual == expected, f"{actual} != {expected}"
+
+ actual = proto.data_received(b"efghi\njklmn")
+ expected = [b"ef", b"ghi\n", b"jk"]
+ assert actual == expected, f"{actual} != {expected}"
+
+
+# Benchmarks
+
+
+def get_frame_packets(size, packet_size=None):
+ if size < 126:
+ frame = bytes([138, size])
+ elif size < 65536:
+ frame = bytes([138, 126]) + bytes(divmod(size, 256))
+ else:
+ size1, size2 = divmod(size, 65536)
+ frame = (
+ bytes([138, 127]) + bytes(divmod(size1, 256)) + bytes(divmod(size2, 256))
+ )
+ frame += os.urandom(size)
+ if packet_size is None:
+ return [frame]
+ else:
+ packets = []
+ while frame:
+ packets.append(frame[:packet_size])
+ frame = frame[packet_size:]
+ return packets
+
+
+def benchmark_stream(StreamReader, packets, size, count):
+ reader = StreamReader()
+ for _ in range(count):
+ for packet in packets:
+ reader.feed_data(packet)
+ yield from reader.readexactly(2)
+ if size >= 65536:
+ yield from reader.readexactly(4)
+ elif size >= 126:
+ yield from reader.readexactly(2)
+ yield from reader.readexactly(size)
+ reader.feed_eof()
+ assert reader.at_eof()
+
+
+def benchmark_burst(StreamReader, packets, size, count):
+ reader = StreamReader()
+ for _ in range(count):
+ for packet in packets:
+ reader.feed_data(packet)
+ reader.feed_eof()
+ for _ in range(count):
+ yield from reader.readexactly(2)
+ if size >= 65536:
+ yield from reader.readexactly(4)
+ elif size >= 126:
+ yield from reader.readexactly(2)
+ yield from reader.readexactly(size)
+ assert reader.at_eof()
+
+
+def run_benchmark(size, count, packet_size=None, number=1000):
+ stmt = f"list(benchmark(StreamReader, packets, {size}, {count}))"
+ setup = f"packets = get_frame_packets({size}, {packet_size})"
+ context = globals()
+
+ context["StreamReader"] = context["ByteArrayStreamReader"]
+ context["benchmark"] = context["benchmark_stream"]
+ bas = min(timeit.repeat(stmt, setup, number=number, globals=context))
+ context["benchmark"] = context["benchmark_burst"]
+ bab = min(timeit.repeat(stmt, setup, number=number, globals=context))
+
+ context["StreamReader"] = context["BytesDequeStreamReader"]
+ context["benchmark"] = context["benchmark_stream"]
+ bds = min(timeit.repeat(stmt, setup, number=number, globals=context))
+ context["benchmark"] = context["benchmark_burst"]
+ bdb = min(timeit.repeat(stmt, setup, number=number, globals=context))
+
+ print(
+ f"Frame size = {size} bytes, "
+ f"frame count = {count}, "
+ f"packet size = {packet_size}"
+ )
+ print(f"* ByteArrayStreamReader (stream): {bas / number * 1_000_000:.1f}µs")
+ print(
+ f"* BytesDequeStreamReader (stream): "
+ f"{bds / number * 1_000_000:.1f}µs ({(bds / bas - 1) * 100:+.1f}%)"
+ )
+ print(f"* ByteArrayStreamReader (burst): {bab / number * 1_000_000:.1f}µs")
+ print(
+ f"* BytesDequeStreamReader (burst): "
+ f"{bdb / number * 1_000_000:.1f}µs ({(bdb / bab - 1) * 100:+.1f}%)"
+ )
+ print()
+
+
+if __name__ == "__main__":
+ run_test(ByteArrayStreamReader)
+ run_test(BytesDequeStreamReader)
+
+ run_benchmark(size=8, count=1000)
+ run_benchmark(size=60, count=1000)
+ run_benchmark(size=500, count=500)
+ run_benchmark(size=4_000, count=200)
+ run_benchmark(size=30_000, count=100)
+ run_benchmark(size=250_000, count=50)
+ run_benchmark(size=2_000_000, count=20)
+
+ run_benchmark(size=4_000, count=200, packet_size=1024)
+ run_benchmark(size=30_000, count=100, packet_size=1024)
+ run_benchmark(size=250_000, count=50, packet_size=1024)
+ run_benchmark(size=2_000_000, count=20, packet_size=1024)
+
+ run_benchmark(size=30_000, count=100, packet_size=4096)
+ run_benchmark(size=250_000, count=50, packet_size=4096)
+ run_benchmark(size=2_000_000, count=20, packet_size=4096)
+
+ run_benchmark(size=30_000, count=100, packet_size=16384)
+ run_benchmark(size=250_000, count=50, packet_size=16384)
+ run_benchmark(size=2_000_000, count=20, packet_size=16384)
+
+ run_benchmark(size=250_000, count=50, packet_size=65536)
+ run_benchmark(size=2_000_000, count=20, packet_size=65536)