summaryrefslogtreecommitdiffstats
path: root/third_party/python/aiohttp/examples/background_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/aiohttp/examples/background_tasks.py')
-rwxr-xr-xthird_party/python/aiohttp/examples/background_tasks.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/third_party/python/aiohttp/examples/background_tasks.py b/third_party/python/aiohttp/examples/background_tasks.py
new file mode 100755
index 0000000000..2a1ec12afa
--- /dev/null
+++ b/third_party/python/aiohttp/examples/background_tasks.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+"""Example of aiohttp.web.Application.on_startup signal handler"""
+import asyncio
+
+import aioredis
+
+from aiohttp import web
+
+
+async def websocket_handler(request):
+ ws = web.WebSocketResponse()
+ await ws.prepare(request)
+ request.app["websockets"].append(ws)
+ try:
+ async for msg in ws:
+ print(msg)
+ await asyncio.sleep(1)
+ finally:
+ request.app["websockets"].remove(ws)
+ return ws
+
+
+async def on_shutdown(app):
+ for ws in app["websockets"]:
+ await ws.close(code=999, message="Server shutdown")
+
+
+async def listen_to_redis(app):
+ try:
+ sub = await aioredis.create_redis(("localhost", 6379), loop=app.loop)
+ ch, *_ = await sub.subscribe("news")
+ async for msg in ch.iter(encoding="utf-8"):
+ # Forward message to all connected websockets:
+ for ws in app["websockets"]:
+ await ws.send_str(f"{ch.name}: {msg}")
+ print(f"message in {ch.name}: {msg}")
+ except asyncio.CancelledError:
+ pass
+ finally:
+ print("Cancel Redis listener: close connection...")
+ await sub.unsubscribe(ch.name)
+ await sub.quit()
+ print("Redis connection closed.")
+
+
+async def start_background_tasks(app):
+ app["redis_listener"] = app.loop.create_task(listen_to_redis(app))
+
+
+async def cleanup_background_tasks(app):
+ print("cleanup background tasks...")
+ app["redis_listener"].cancel()
+ await app["redis_listener"]
+
+
+def init():
+ app = web.Application()
+ app["websockets"] = []
+ app.router.add_get("/news", websocket_handler)
+ app.on_startup.append(start_background_tasks)
+ app.on_cleanup.append(cleanup_background_tasks)
+ app.on_shutdown.append(on_shutdown)
+ return app
+
+
+web.run_app(init())