1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#!/usr/bin/env python
import asyncio
import itertools
import json
import websockets
from connect4 import PLAYER1, PLAYER2, Connect4
async def handler(websocket):
# Initialize a Connect Four game.
game = Connect4()
# Players take alternate turns, using the same browser.
turns = itertools.cycle([PLAYER1, PLAYER2])
player = next(turns)
async for message in websocket:
# Parse a "play" event from the UI.
event = json.loads(message)
assert event["type"] == "play"
column = event["column"]
try:
# Play the move.
row = game.play(player, column)
except RuntimeError as exc:
# Send an "error" event if the move was illegal.
event = {
"type": "error",
"message": str(exc),
}
await websocket.send(json.dumps(event))
continue
# Send a "play" event to update the UI.
event = {
"type": "play",
"player": player,
"column": column,
"row": row,
}
await websocket.send(json.dumps(event))
# If move is winning, send a "win" event.
if game.winner is not None:
event = {
"type": "win",
"player": game.winner,
}
await websocket.send(json.dumps(event))
# Alternate turns.
player = next(turns)
async def main():
async with websockets.serve(handler, "", 8001):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
|