From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../python/aiohttp/examples/background_tasks.py | 66 ++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100755 third_party/python/aiohttp/examples/background_tasks.py (limited to 'third_party/python/aiohttp/examples/background_tasks.py') diff --git a/third_party/python/aiohttp/examples/background_tasks.py b/third_party/python/aiohttp/examples/background_tasks.py new file mode 100755 index 0000000000..8c2bb9ee69 --- /dev/null +++ b/third_party/python/aiohttp/examples/background_tasks.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Example of aiohttp.web.Application.on_startup signal handler""" +import asyncio + +import aioredis + +from aiohttp import web + + +async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + request.app["websockets"].append(ws) + try: + async for msg in ws: + print(msg) + await asyncio.sleep(1) + finally: + request.app["websockets"].remove(ws) + return ws + + +async def on_shutdown(app): + for ws in app["websockets"]: + await ws.close(code=999, message="Server shutdown") + + +async def listen_to_redis(app): + try: + sub = await aioredis.create_redis(("localhost", 6379), loop=app.loop) + ch, *_ = await sub.subscribe("news") + async for msg in ch.iter(encoding="utf-8"): + # Forward message to all connected websockets: + for ws in app["websockets"]: + await ws.send_str(f"{ch.name}: {msg}") + print(f"message in {ch.name}: {msg}") + except asyncio.CancelledError: + pass + finally: + print("Cancel Redis listener: close connection...") + await sub.unsubscribe(ch.name) + await sub.quit() + print("Redis connection closed.") + + +async def start_background_tasks(app: web.Application) -> None: + app["redis_listener"] = asyncio.create_task(listen_to_redis(app)) + + +async def cleanup_background_tasks(app): + print("cleanup background tasks...") + app["redis_listener"].cancel() + await app["redis_listener"] + + +def init(): + app = web.Application() + app["websockets"] = [] + app.router.add_get("/news", websocket_handler) + app.on_startup.append(start_background_tasks) + app.on_cleanup.append(cleanup_background_tasks) + app.on_shutdown.append(on_shutdown) + return app + + +web.run_app(init()) -- cgit v1.2.3