summaryrefslogtreecommitdiffstats
path: root/third_party/python/aiohttp/examples/legacy
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/aiohttp/examples/legacy')
-rwxr-xr-xthird_party/python/aiohttp/examples/legacy/crawl.py108
-rwxr-xr-xthird_party/python/aiohttp/examples/legacy/srv.py178
-rwxr-xr-xthird_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py172
3 files changed, 458 insertions, 0 deletions
diff --git a/third_party/python/aiohttp/examples/legacy/crawl.py b/third_party/python/aiohttp/examples/legacy/crawl.py
new file mode 100755
index 0000000000..c8029b4854
--- /dev/null
+++ b/third_party/python/aiohttp/examples/legacy/crawl.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+
+import asyncio
+import logging
+import re
+import signal
+import sys
+import urllib.parse
+
+import aiohttp
+
+
+class Crawler:
+ def __init__(self, rooturl, loop, maxtasks=100):
+ self.rooturl = rooturl
+ self.loop = loop
+ self.todo = set()
+ self.busy = set()
+ self.done = {}
+ self.tasks = set()
+ self.sem = asyncio.Semaphore(maxtasks, loop=loop)
+
+ # connector stores cookies between requests and uses connection pool
+ self.session = aiohttp.ClientSession(loop=loop)
+
+ async def run(self):
+ t = asyncio.ensure_future(self.addurls([(self.rooturl, "")]), loop=self.loop)
+ await asyncio.sleep(1, loop=self.loop)
+ while self.busy:
+ await asyncio.sleep(1, loop=self.loop)
+
+ await t
+ await self.session.close()
+ self.loop.stop()
+
+ async def addurls(self, urls):
+ for url, parenturl in urls:
+ url = urllib.parse.urljoin(parenturl, url)
+ url, frag = urllib.parse.urldefrag(url)
+ if (
+ url.startswith(self.rooturl)
+ and url not in self.busy
+ and url not in self.done
+ and url not in self.todo
+ ):
+ self.todo.add(url)
+ await self.sem.acquire()
+ task = asyncio.ensure_future(self.process(url), loop=self.loop)
+ task.add_done_callback(lambda t: self.sem.release())
+ task.add_done_callback(self.tasks.remove)
+ self.tasks.add(task)
+
+ async def process(self, url):
+ print("processing:", url)
+
+ self.todo.remove(url)
+ self.busy.add(url)
+ try:
+ resp = await self.session.get(url)
+ except Exception as exc:
+ print("...", url, "has error", repr(str(exc)))
+ self.done[url] = False
+ else:
+ if resp.status == 200 and ("text/html" in resp.headers.get("content-type")):
+ data = (await resp.read()).decode("utf-8", "replace")
+ urls = re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', data)
+ asyncio.Task(self.addurls([(u, url) for u in urls]))
+
+ resp.close()
+ self.done[url] = True
+
+ self.busy.remove(url)
+ print(
+ len(self.done),
+ "completed tasks,",
+ len(self.tasks),
+ "still pending, todo",
+ len(self.todo),
+ )
+
+
+def main():
+ loop = asyncio.get_event_loop()
+
+ c = Crawler(sys.argv[1], loop)
+ asyncio.ensure_future(c.run(), loop=loop)
+
+ try:
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+ except RuntimeError:
+ pass
+ loop.run_forever()
+ print("todo:", len(c.todo))
+ print("busy:", len(c.busy))
+ print("done:", len(c.done), "; ok:", sum(c.done.values()))
+ print("tasks:", len(c.tasks))
+
+
+if __name__ == "__main__":
+ if "--iocp" in sys.argv:
+ from asyncio import events, windows_events
+
+ sys.argv.remove("--iocp")
+ logging.info("using iocp")
+ el = windows_events.ProactorEventLoop()
+ events.set_event_loop(el)
+
+ main()
diff --git a/third_party/python/aiohttp/examples/legacy/srv.py b/third_party/python/aiohttp/examples/legacy/srv.py
new file mode 100755
index 0000000000..628b6f332f
--- /dev/null
+++ b/third_party/python/aiohttp/examples/legacy/srv.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+"""Simple server written using an event loop."""
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+
+import aiohttp
+import aiohttp.server
+
+try:
+ import ssl
+except ImportError: # pragma: no cover
+ ssl = None
+
+
+class HttpRequestHandler(aiohttp.server.ServerHttpProtocol):
+ async def handle_request(self, message, payload):
+ print(
+ "method = {!r}; path = {!r}; version = {!r}".format(
+ message.method, message.path, message.version
+ )
+ )
+
+ path = message.path
+
+ if not (path.isprintable() and path.startswith("/")) or "/." in path:
+ print("bad path", repr(path))
+ path = None
+ else:
+ path = "." + path
+ if not os.path.exists(path):
+ print("no file", repr(path))
+ path = None
+ else:
+ isdir = os.path.isdir(path)
+
+ if not path:
+ raise aiohttp.HttpProcessingError(code=404)
+
+ for hdr, val in message.headers.items():
+ print(hdr, val)
+
+ if isdir and not path.endswith("/"):
+ path = path + "/"
+ raise aiohttp.HttpProcessingError(
+ code=302, headers=(("URI", path), ("Location", path))
+ )
+
+ response = aiohttp.Response(self.writer, 200, http_version=message.version)
+ response.add_header("Transfer-Encoding", "chunked")
+
+ # content encoding
+ accept_encoding = message.headers.get("accept-encoding", "").lower()
+ if "deflate" in accept_encoding:
+ response.add_header("Content-Encoding", "deflate")
+ response.add_compression_filter("deflate")
+ elif "gzip" in accept_encoding:
+ response.add_header("Content-Encoding", "gzip")
+ response.add_compression_filter("gzip")
+
+ response.add_chunking_filter(1025)
+
+ if isdir:
+ response.add_header("Content-type", "text/html")
+ response.send_headers()
+
+ response.write(b"<ul>\r\n")
+ for name in sorted(os.listdir(path)):
+ if name.isprintable() and not name.startswith("."):
+ try:
+ bname = name.encode("ascii")
+ except UnicodeError:
+ pass
+ else:
+ if os.path.isdir(os.path.join(path, name)):
+ response.write(
+ b'<li><a href="'
+ + bname
+ + b'/">'
+ + bname
+ + b"/</a></li>\r\n"
+ )
+ else:
+ response.write(
+ b'<li><a href="'
+ + bname
+ + b'">'
+ + bname
+ + b"</a></li>\r\n"
+ )
+ response.write(b"</ul>")
+ else:
+ response.add_header("Content-type", "text/plain")
+ response.send_headers()
+
+ try:
+ with open(path, "rb") as fp:
+ chunk = fp.read(8192)
+ while chunk:
+ response.write(chunk)
+ chunk = fp.read(8192)
+ except OSError:
+ response.write(b"Cannot open")
+
+ await response.write_eof()
+ if response.keep_alive():
+ self.keep_alive(True)
+
+
+ARGS = argparse.ArgumentParser(description="Run simple HTTP server.")
+ARGS.add_argument(
+ "--host", action="store", dest="host", default="127.0.0.1", help="Host name"
+)
+ARGS.add_argument(
+ "--port", action="store", dest="port", default=8080, type=int, help="Port number"
+)
+# make iocp and ssl mutually exclusive because ProactorEventLoop is
+# incompatible with SSL
+group = ARGS.add_mutually_exclusive_group()
+group.add_argument(
+ "--iocp", action="store_true", dest="iocp", help="Windows IOCP event loop"
+)
+group.add_argument("--ssl", action="store_true", dest="ssl", help="Run ssl mode.")
+ARGS.add_argument("--sslcert", action="store", dest="certfile", help="SSL cert file.")
+ARGS.add_argument("--sslkey", action="store", dest="keyfile", help="SSL key file.")
+
+
+def main():
+ args = ARGS.parse_args()
+
+ if ":" in args.host:
+ args.host, port = args.host.split(":", 1)
+ args.port = int(port)
+
+ if args.iocp:
+ from asyncio import windows_events
+
+ sys.argv.remove("--iocp")
+ logging.info("using iocp")
+ el = windows_events.ProactorEventLoop()
+ asyncio.set_event_loop(el)
+
+ if args.ssl:
+ here = os.path.join(os.path.dirname(__file__), "tests")
+
+ if args.certfile:
+ certfile = args.certfile or os.path.join(here, "sample.crt")
+ keyfile = args.keyfile or os.path.join(here, "sample.key")
+ else:
+ certfile = os.path.join(here, "sample.crt")
+ keyfile = os.path.join(here, "sample.key")
+
+ sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext.load_cert_chain(certfile, keyfile)
+ else:
+ sslcontext = None
+
+ loop = asyncio.get_event_loop()
+ f = loop.create_server(
+ lambda: HttpRequestHandler(debug=True, keep_alive=75),
+ args.host,
+ args.port,
+ ssl=sslcontext,
+ )
+ svr = loop.run_until_complete(f)
+ socks = svr.sockets
+ print("serving on", socks[0].getsockname())
+ try:
+ loop.run_forever()
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__ == "__main__":
+ main()
diff --git a/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py b/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py
new file mode 100755
index 0000000000..ca49db7d8f
--- /dev/null
+++ b/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python3
+"""Protocol parser example."""
+import argparse
+import asyncio
+import collections
+
+import aiohttp
+
+try:
+ import signal
+except ImportError:
+ signal = None
+
+
+MSG_TEXT = b"text:"
+MSG_PING = b"ping:"
+MSG_PONG = b"pong:"
+MSG_STOP = b"stop:"
+
+Message = collections.namedtuple("Message", ("tp", "data"))
+
+
+def my_protocol_parser(out, buf):
+ """Parser is used with StreamParser for incremental protocol parsing.
+ Parser is a generator function, but it is not a coroutine. Usually
+ parsers are implemented as a state machine.
+
+ more details in asyncio/parsers.py
+ existing parsers:
+ * HTTP protocol parsers asyncio/http/protocol.py
+ * websocket parser asyncio/http/websocket.py
+ """
+ while True:
+ tp = yield from buf.read(5)
+ if tp in (MSG_PING, MSG_PONG):
+ # skip line
+ yield from buf.skipuntil(b"\r\n")
+ out.feed_data(Message(tp, None))
+ elif tp == MSG_STOP:
+ out.feed_data(Message(tp, None))
+ elif tp == MSG_TEXT:
+ # read text
+ text = yield from buf.readuntil(b"\r\n")
+ out.feed_data(Message(tp, text.strip().decode("utf-8")))
+ else:
+ raise ValueError("Unknown protocol prefix.")
+
+
+class MyProtocolWriter:
+ def __init__(self, transport):
+ self.transport = transport
+
+ def ping(self):
+ self.transport.write(b"ping:\r\n")
+
+ def pong(self):
+ self.transport.write(b"pong:\r\n")
+
+ def stop(self):
+ self.transport.write(b"stop:\r\n")
+
+ def send_text(self, text):
+ self.transport.write(f"text:{text.strip()}\r\n".encode("utf-8"))
+
+
+class EchoServer(asyncio.Protocol):
+ def connection_made(self, transport):
+ print("Connection made")
+ self.transport = transport
+ self.stream = aiohttp.StreamParser()
+ asyncio.Task(self.dispatch())
+
+ def data_received(self, data):
+ self.stream.feed_data(data)
+
+ def eof_received(self):
+ self.stream.feed_eof()
+
+ def connection_lost(self, exc):
+ print("Connection lost")
+
+ async def dispatch(self):
+ reader = self.stream.set_parser(my_protocol_parser)
+ writer = MyProtocolWriter(self.transport)
+
+ while True:
+ try:
+ msg = await reader.read()
+ except aiohttp.ConnectionError:
+ # client has been disconnected
+ break
+
+ print(f"Message received: {msg}")
+
+ if msg.type == MSG_PING:
+ writer.pong()
+ elif msg.type == MSG_TEXT:
+ writer.send_text("Re: " + msg.data)
+ elif msg.type == MSG_STOP:
+ self.transport.close()
+ break
+
+
+async def start_client(loop, host, port):
+ transport, stream = await loop.create_connection(aiohttp.StreamProtocol, host, port)
+ reader = stream.reader.set_parser(my_protocol_parser)
+ writer = MyProtocolWriter(transport)
+ writer.ping()
+
+ message = "This is the message. It will be echoed."
+
+ while True:
+ try:
+ msg = await reader.read()
+ except aiohttp.ConnectionError:
+ print("Server has been disconnected.")
+ break
+
+ print(f"Message received: {msg}")
+ if msg.type == MSG_PONG:
+ writer.send_text(message)
+ print("data sent:", message)
+ elif msg.type == MSG_TEXT:
+ writer.stop()
+ print("stop sent")
+ break
+
+ transport.close()
+
+
+def start_server(loop, host, port):
+ f = loop.create_server(EchoServer, host, port)
+ srv = loop.run_until_complete(f)
+ x = srv.sockets[0]
+ print("serving on", x.getsockname())
+ loop.run_forever()
+
+
+ARGS = argparse.ArgumentParser(description="Protocol parser example.")
+ARGS.add_argument(
+ "--server", action="store_true", dest="server", default=False, help="Run tcp server"
+)
+ARGS.add_argument(
+ "--client", action="store_true", dest="client", default=False, help="Run tcp client"
+)
+ARGS.add_argument(
+ "--host", action="store", dest="host", default="127.0.0.1", help="Host name"
+)
+ARGS.add_argument(
+ "--port", action="store", dest="port", default=9999, type=int, help="Port number"
+)
+
+
+if __name__ == "__main__":
+ args = ARGS.parse_args()
+
+ if ":" in args.host:
+ args.host, port = args.host.split(":", 1)
+ args.port = int(port)
+
+ if (not (args.server or args.client)) or (args.server and args.client):
+ print("Please specify --server or --client\n")
+ ARGS.print_help()
+ else:
+ loop = asyncio.get_event_loop()
+ if signal is not None:
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+
+ if args.server:
+ start_server(loop, args.host, args.port)
+ else:
+ loop.run_until_complete(start_client(loop, args.host, args.port))