1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
#!/usr/bin/env python
import asyncio
import functools
import os
import sys
import time
import websockets
CLIENTS = set()
async def send(websocket, message):
try:
await websocket.send(message)
except websockets.ConnectionClosed:
pass
async def relay(queue, websocket):
while True:
message = await queue.get()
await websocket.send(message)
class PubSub:
def __init__(self):
self.waiter = asyncio.Future()
def publish(self, value):
waiter, self.waiter = self.waiter, asyncio.Future()
waiter.set_result((value, self.waiter))
async def subscribe(self):
waiter = self.waiter
while True:
value, waiter = await waiter
yield value
__aiter__ = subscribe
PUBSUB = PubSub()
async def handler(websocket, method=None):
if method in ["default", "naive", "task", "wait"]:
CLIENTS.add(websocket)
try:
await websocket.wait_closed()
finally:
CLIENTS.remove(websocket)
elif method == "queue":
queue = asyncio.Queue()
relay_task = asyncio.create_task(relay(queue, websocket))
CLIENTS.add(queue)
try:
await websocket.wait_closed()
finally:
CLIENTS.remove(queue)
relay_task.cancel()
elif method == "pubsub":
async for message in PUBSUB:
await websocket.send(message)
else:
raise NotImplementedError(f"unsupported method: {method}")
async def broadcast(method, size, delay):
"""Broadcast messages at regular intervals."""
load_average = 0
time_average = 0
pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
await asyncio.sleep(delay)
while True:
print(f"clients = {len(CLIENTS)}")
pc0, pt0 = time.perf_counter_ns(), time.process_time_ns()
load_average = 0.9 * load_average + 0.1 * (pt0 - pt1) / (pc0 - pc1)
print(
f"load = {(pt0 - pt1) / (pc0 - pc1) * 100:.1f}% / "
f"average = {load_average * 100:.1f}%, "
f"late = {(pc0 - pc1 - delay * 1e9) / 1e6:.1f} ms"
)
pc1, pt1 = pc0, pt0
assert size > 20
message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)
if method == "default":
websockets.broadcast(CLIENTS, message)
elif method == "naive":
# Since the loop can yield control, make a copy of CLIENTS
# to avoid: RuntimeError: Set changed size during iteration
for websocket in CLIENTS.copy():
await send(websocket, message)
elif method == "task":
for websocket in CLIENTS:
asyncio.create_task(send(websocket, message))
elif method == "wait":
if CLIENTS: # asyncio.wait doesn't accept an empty list
await asyncio.wait(
[
asyncio.create_task(send(websocket, message))
for websocket in CLIENTS
]
)
elif method == "queue":
for queue in CLIENTS:
queue.put_nowait(message)
elif method == "pubsub":
PUBSUB.publish(message)
else:
raise NotImplementedError(f"unsupported method: {method}")
pc2 = time.perf_counter_ns()
wait = delay + (pc1 - pc2) / 1e9
time_average = 0.9 * time_average + 0.1 * (pc2 - pc1)
print(
f"broadcast = {(pc2 - pc1) / 1e6:.1f}ms / "
f"average = {time_average / 1e6:.1f}ms, "
f"wait = {wait * 1e3:.1f}ms"
)
await asyncio.sleep(wait)
print()
async def main(method, size, delay):
async with websockets.serve(
functools.partial(handler, method=method),
"localhost",
8765,
compression=None,
ping_timeout=None,
):
await broadcast(method, size, delay)
if __name__ == "__main__":
try:
method = sys.argv[1]
assert method in ["default", "naive", "task", "wait", "queue", "pubsub"]
size = int(sys.argv[2])
delay = float(sys.argv[3])
except Exception as exc:
print(f"Usage: {sys.argv[0]} method size delay")
print(" Start a server broadcasting messages with <method> e.g. naive")
print(" Send a payload of <size> bytes every <delay> seconds")
print()
print(exc)
else:
asyncio.run(main(method, size, delay))
|