summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast')
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py61
-rw-r--r--testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py153
2 files changed, 214 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py
new file mode 100644
index 0000000000..fe39dfe051
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/clients.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+import asyncio
+import statistics
+import sys
+import time
+
+import websockets
+
+
+LATENCIES = {}
+
+
+async def log_latency(interval):
+ while True:
+ await asyncio.sleep(interval)
+ p = statistics.quantiles(LATENCIES.values(), n=100)
+ print(f"clients = {len(LATENCIES)}")
+ print(
+ f"p50 = {p[49] / 1e6:.1f}ms, "
+ f"p95 = {p[94] / 1e6:.1f}ms, "
+ f"p99 = {p[98] / 1e6:.1f}ms"
+ )
+ print()
+
+
+async def client():
+ try:
+ async with websockets.connect(
+ "ws://localhost:8765",
+ ping_timeout=None,
+ ) as websocket:
+ async for msg in websocket:
+ client_time = time.time_ns()
+ server_time = int(msg[:19].decode())
+ LATENCIES[websocket] = client_time - server_time
+ except Exception as exc:
+ print(exc)
+
+
+async def main(count, interval):
+ asyncio.create_task(log_latency(interval))
+ clients = []
+ for _ in range(count):
+ clients.append(asyncio.create_task(client()))
+ await asyncio.sleep(0.001) # 1ms between each connection
+ await asyncio.wait(clients)
+
+
+if __name__ == "__main__":
+ try:
+ count = int(sys.argv[1])
+ interval = float(sys.argv[2])
+ except Exception as exc:
+ print(f"Usage: {sys.argv[0]} count interval")
+ print(" Connect <count> clients e.g. 1000")
+ print(" Report latency every <interval> seconds e.g. 1")
+ print()
+ print(exc)
+ else:
+ asyncio.run(main(count, interval))
diff --git a/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py
new file mode 100644
index 0000000000..9c9907b7f9
--- /dev/null
+++ b/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+
+import asyncio
+import functools
+import os
+import sys
+import time
+
+import websockets
+
+
+CLIENTS = set()
+
+
+async def send(websocket, message):
+ try:
+ await websocket.send(message)
+ except websockets.ConnectionClosed:
+ pass
+
+
+async def relay(queue, websocket):
+ while True:
+ message = await queue.get()
+ await websocket.send(message)
+
+
+class PubSub:
+ def __init__(self):
+ self.waiter = asyncio.Future()
+
+ def publish(self, value):
+ waiter, self.waiter = self.waiter, asyncio.Future()
+ waiter.set_result((value, self.waiter))
+
+ async def subscribe(self):
+ waiter = self.waiter
+ while True:
+ value, waiter = await waiter
+ yield value
+
+ __aiter__ = subscribe
+
+
+PUBSUB = PubSub()
+
+
+async def handler(websocket, method=None):
+ if method in ["default", "naive", "task", "wait"]:
+ CLIENTS.add(websocket)
+ try:
+ await websocket.wait_closed()
+ finally:
+ CLIENTS.remove(websocket)
+ elif method == "queue":
+ queue = asyncio.Queue()
+ relay_task = asyncio.create_task(relay(queue, websocket))
+ CLIENTS.add(queue)
+ try:
+ await websocket.wait_closed()
+ finally:
+ CLIENTS.remove(queue)
+ relay_task.cancel()
+ elif method == "pubsub":
+ async for message in PUBSUB:
+ await websocket.send(message)
+ else:
+ raise NotImplementedError(f"unsupported method: {method}")
+
+
+async def broadcast(method, size, delay):
+ """Broadcast messages at regular intervals."""
+ load_average = 0
+ time_average = 0
+ pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
+ await asyncio.sleep(delay)
+ while True:
+ print(f"clients = {len(CLIENTS)}")
+ pc0, pt0 = time.perf_counter_ns(), time.process_time_ns()
+ load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1)
+ print(
+ f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / "
+ f"average = {load_average * 100:.1f}%, "
+ f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms"
+ )
+ pc1, pt1 = pc0, pt0
+
+ assert size > 20
+ message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)
+
+ if method == "default":
+ websockets.broadcast(CLIENTS, message)
+ elif method == "naive":
+ # Since the loop can yield control, make a copy of CLIENTS
+ # to avoid: RuntimeError: Set changed size during iteration
+ for websocket in CLIENTS.copy():
+ await send(websocket, message)
+ elif method == "task":
+ for websocket in CLIENTS:
+ asyncio.create_task(send(websocket, message))
+ elif method == "wait":
+ if CLIENTS: # asyncio.wait doesn't accept an empty list
+ await asyncio.wait(
+ [
+ asyncio.create_task(send(websocket, message))
+ for websocket in CLIENTS
+ ]
+ )
+ elif method == "queue":
+ for queue in CLIENTS:
+ queue.put_nowait(message)
+ elif method == "pubsub":
+ PUBSUB.publish(message)
+ else:
+ raise NotImplementedError(f"unsupported method: {method}")
+
+ pc2 = time.perf_counter_ns()
+ wait = delay + (pc1 - pc2) / 1e9
+ time_average = 0.9 * time_average + 0.1 * (pc2 - pc1)
+ print(
+ f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / "
+ f"average = {time_average / 1e6:.1f}ms, "
+ f"wait = {wait * 1e3:.1f}ms"
+ )
+ await asyncio.sleep(wait)
+ print()
+
+
+async def main(method, size, delay):
+ async with websockets.serve(
+ functools.partial(handler, method=method),
+ "localhost",
+ 8765,
+ compression=None,
+ ping_timeout=None,
+ ):
+ await broadcast(method, size, delay)
+
+
+if __name__ == "__main__":
+ try:
+ method = sys.argv[1]
+ assert method in ["default", "naive", "task", "wait", "queue", "pubsub"]
+ size = int(sys.argv[2])
+ delay = float(sys.argv[3])
+ except Exception as exc:
+ print(f"Usage: {sys.argv[0]} method size delay")
+ print(" Start a server broadcasting messages with <method> e.g. naive")
+ print(" Send a payload of <size> bytes every <delay> seconds")
+ print()
+ print(exc)
+ else:
+ asyncio.run(main(method, size, delay))