summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/third_party/websockets/experiments/broadcast/server.py
blob: 9c9907b7f9e4213593cc2ebd758523cb25937ef7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python

import asyncio
import functools
import os
import sys
import time

import websockets


CLIENTS = set()


async def send(websocket, message):
    try:
        await websocket.send(message)
    except websockets.ConnectionClosed:
        pass


async def relay(queue, websocket):
    while True:
        message = await queue.get()
        await websocket.send(message)


class PubSub:
    def __init__(self):
        self.waiter = asyncio.Future()

    def publish(self, value):
        waiter, self.waiter = self.waiter, asyncio.Future()
        waiter.set_result((value, self.waiter))

    async def subscribe(self):
        waiter = self.waiter
        while True:
            value, waiter = await waiter
            yield value

    __aiter__ = subscribe


PUBSUB = PubSub()


async def handler(websocket, method=None):
    if method in ["default", "naive", "task", "wait"]:
        CLIENTS.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            CLIENTS.remove(websocket)
    elif method == "queue":
        queue = asyncio.Queue()
        relay_task = asyncio.create_task(relay(queue, websocket))
        CLIENTS.add(queue)
        try:
            await websocket.wait_closed()
        finally:
            CLIENTS.remove(queue)
            relay_task.cancel()
    elif method == "pubsub":
        async for message in PUBSUB:
            await websocket.send(message)
    else:
        raise NotImplementedError(f"unsupported method: {method}")


async def broadcast(method, size, delay):
    """Broadcast messages at regular intervals."""
    load_average = 0
    time_average = 0
    pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
    await asyncio.sleep(delay)
    while True:
        print(f"clients = {len(CLIENTS)}")
        pc0, pt0 = time.perf_counter_ns(), time.process_time_ns()
        load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1)
        print(
            f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / "
            f"average = {load_average * 100:.1f}%, "
            f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms"
        )
        pc1, pt1 = pc0, pt0

        assert size > 20
        message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)

        if method == "default":
            websockets.broadcast(CLIENTS, message)
        elif method == "naive":
            # Since the loop can yield control, make a copy of CLIENTS
            # to avoid: RuntimeError: Set changed size during iteration
            for websocket in CLIENTS.copy():
                await send(websocket, message)
        elif method == "task":
            for websocket in CLIENTS:
                asyncio.create_task(send(websocket, message))
        elif method == "wait":
            if CLIENTS:  # asyncio.wait doesn't accept an empty list
                await asyncio.wait(
                    [
                        asyncio.create_task(send(websocket, message))
                        for websocket in CLIENTS
                    ]
                )
        elif method == "queue":
            for queue in CLIENTS:
                queue.put_nowait(message)
        elif method == "pubsub":
            PUBSUB.publish(message)
        else:
            raise NotImplementedError(f"unsupported method: {method}")

        pc2 = time.perf_counter_ns()
        wait = delay + (pc1 - pc2) / 1e9
        time_average = 0.9 * time_average + 0.1 * (pc2 - pc1)
        print(
            f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / "
            f"average = {time_average / 1e6:.1f}ms, "
            f"wait = {wait * 1e3:.1f}ms"
        )
        await asyncio.sleep(wait)
        print()


async def main(method, size, delay):
    async with websockets.serve(
        functools.partial(handler, method=method),
        "localhost",
        8765,
        compression=None,
        ping_timeout=None,
    ):
        await broadcast(method, size, delay)


if __name__ == "__main__":
    try:
        method = sys.argv[1]
        assert method in ["default", "naive", "task", "wait", "queue", "pubsub"]
        size = int(sys.argv[2])
        delay = float(sys.argv[3])
    except Exception as exc:
        print(f"Usage: {sys.argv[0]} method size delay")
        print("    Start a server broadcasting messages with <method> e.g. naive")
        print("    Send a payload of <size> bytes every <delay> seconds")
        print()
        print(exc)
    else:
        asyncio.run(main(method, size, delay))