diff options
Diffstat (limited to 'third_party/python/aiohttp/examples/legacy')
3 files changed, 458 insertions, 0 deletions
diff --git a/third_party/python/aiohttp/examples/legacy/crawl.py b/third_party/python/aiohttp/examples/legacy/crawl.py new file mode 100755 index 0000000000..c8029b4854 --- /dev/null +++ b/third_party/python/aiohttp/examples/legacy/crawl.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 + +import asyncio +import logging +import re +import signal +import sys +import urllib.parse + +import aiohttp + + +class Crawler: + def __init__(self, rooturl, loop, maxtasks=100): + self.rooturl = rooturl + self.loop = loop + self.todo = set() + self.busy = set() + self.done = {} + self.tasks = set() + self.sem = asyncio.Semaphore(maxtasks, loop=loop) + + # connector stores cookies between requests and uses connection pool + self.session = aiohttp.ClientSession(loop=loop) + + async def run(self): + t = asyncio.ensure_future(self.addurls([(self.rooturl, "")]), loop=self.loop) + await asyncio.sleep(1, loop=self.loop) + while self.busy: + await asyncio.sleep(1, loop=self.loop) + + await t + await self.session.close() + self.loop.stop() + + async def addurls(self, urls): + for url, parenturl in urls: + url = urllib.parse.urljoin(parenturl, url) + url, frag = urllib.parse.urldefrag(url) + if ( + url.startswith(self.rooturl) + and url not in self.busy + and url not in self.done + and url not in self.todo + ): + self.todo.add(url) + await self.sem.acquire() + task = asyncio.ensure_future(self.process(url), loop=self.loop) + task.add_done_callback(lambda t: self.sem.release()) + task.add_done_callback(self.tasks.remove) + self.tasks.add(task) + + async def process(self, url): + print("processing:", url) + + self.todo.remove(url) + self.busy.add(url) + try: + resp = await self.session.get(url) + except Exception as exc: + print("...", url, "has error", repr(str(exc))) + self.done[url] = False + else: + if resp.status == 200 and ("text/html" in resp.headers.get("content-type")): + data = (await resp.read()).decode("utf-8", "replace") + urls = re.findall(r'(?i)href=["\']?([^\s"\'<>]+)', data) + asyncio.Task(self.addurls([(u, url) for u in urls])) + + resp.close() + self.done[url] = True + + self.busy.remove(url) + print( + len(self.done), + "completed tasks,", + len(self.tasks), + "still pending, todo", + len(self.todo), + ) + + +def main(): + loop = asyncio.get_event_loop() + + c = Crawler(sys.argv[1], loop) + asyncio.ensure_future(c.run(), loop=loop) + + try: + loop.add_signal_handler(signal.SIGINT, loop.stop) + except RuntimeError: + pass + loop.run_forever() + print("todo:", len(c.todo)) + print("busy:", len(c.busy)) + print("done:", len(c.done), "; ok:", sum(c.done.values())) + print("tasks:", len(c.tasks)) + + +if __name__ == "__main__": + if "--iocp" in sys.argv: + from asyncio import events, windows_events + + sys.argv.remove("--iocp") + logging.info("using iocp") + el = windows_events.ProactorEventLoop() + events.set_event_loop(el) + + main() diff --git a/third_party/python/aiohttp/examples/legacy/srv.py b/third_party/python/aiohttp/examples/legacy/srv.py new file mode 100755 index 0000000000..628b6f332f --- /dev/null +++ b/third_party/python/aiohttp/examples/legacy/srv.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +"""Simple server written using an event loop.""" + +import argparse +import asyncio +import logging +import os +import sys + +import aiohttp +import aiohttp.server + +try: + import ssl +except ImportError: # pragma: no cover + ssl = None + + +class HttpRequestHandler(aiohttp.server.ServerHttpProtocol): + async def handle_request(self, message, payload): + print( + "method = {!r}; path = {!r}; version = {!r}".format( + message.method, message.path, message.version + ) + ) + + path = message.path + + if not (path.isprintable() and path.startswith("/")) or "/." in path: + print("bad path", repr(path)) + path = None + else: + path = "." + path + if not os.path.exists(path): + print("no file", repr(path)) + path = None + else: + isdir = os.path.isdir(path) + + if not path: + raise aiohttp.HttpProcessingError(code=404) + + for hdr, val in message.headers.items(): + print(hdr, val) + + if isdir and not path.endswith("/"): + path = path + "/" + raise aiohttp.HttpProcessingError( + code=302, headers=(("URI", path), ("Location", path)) + ) + + response = aiohttp.Response(self.writer, 200, http_version=message.version) + response.add_header("Transfer-Encoding", "chunked") + + # content encoding + accept_encoding = message.headers.get("accept-encoding", "").lower() + if "deflate" in accept_encoding: + response.add_header("Content-Encoding", "deflate") + response.add_compression_filter("deflate") + elif "gzip" in accept_encoding: + response.add_header("Content-Encoding", "gzip") + response.add_compression_filter("gzip") + + response.add_chunking_filter(1025) + + if isdir: + response.add_header("Content-type", "text/html") + response.send_headers() + + response.write(b"<ul>\r\n") + for name in sorted(os.listdir(path)): + if name.isprintable() and not name.startswith("."): + try: + bname = name.encode("ascii") + except UnicodeError: + pass + else: + if os.path.isdir(os.path.join(path, name)): + response.write( + b'<li><a href="' + + bname + + b'/">' + + bname + + b"/</a></li>\r\n" + ) + else: + response.write( + b'<li><a href="' + + bname + + b'">' + + bname + + b"</a></li>\r\n" + ) + response.write(b"</ul>") + else: + response.add_header("Content-type", "text/plain") + response.send_headers() + + try: + with open(path, "rb") as fp: + chunk = fp.read(8192) + while chunk: + response.write(chunk) + chunk = fp.read(8192) + except OSError: + response.write(b"Cannot open") + + await response.write_eof() + if response.keep_alive(): + self.keep_alive(True) + + +ARGS = argparse.ArgumentParser(description="Run simple HTTP server.") +ARGS.add_argument( + "--host", action="store", dest="host", default="127.0.0.1", help="Host name" +) +ARGS.add_argument( + "--port", action="store", dest="port", default=8080, type=int, help="Port number" +) +# make iocp and ssl mutually exclusive because ProactorEventLoop is +# incompatible with SSL +group = ARGS.add_mutually_exclusive_group() +group.add_argument( + "--iocp", action="store_true", dest="iocp", help="Windows IOCP event loop" +) +group.add_argument("--ssl", action="store_true", dest="ssl", help="Run ssl mode.") +ARGS.add_argument("--sslcert", action="store", dest="certfile", help="SSL cert file.") +ARGS.add_argument("--sslkey", action="store", dest="keyfile", help="SSL key file.") + + +def main(): + args = ARGS.parse_args() + + if ":" in args.host: + args.host, port = args.host.split(":", 1) + args.port = int(port) + + if args.iocp: + from asyncio import windows_events + + sys.argv.remove("--iocp") + logging.info("using iocp") + el = windows_events.ProactorEventLoop() + asyncio.set_event_loop(el) + + if args.ssl: + here = os.path.join(os.path.dirname(__file__), "tests") + + if args.certfile: + certfile = args.certfile or os.path.join(here, "sample.crt") + keyfile = args.keyfile or os.path.join(here, "sample.key") + else: + certfile = os.path.join(here, "sample.crt") + keyfile = os.path.join(here, "sample.key") + + sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + sslcontext.load_cert_chain(certfile, keyfile) + else: + sslcontext = None + + loop = asyncio.get_event_loop() + f = loop.create_server( + lambda: HttpRequestHandler(debug=True, keep_alive=75), + args.host, + args.port, + ssl=sslcontext, + ) + svr = loop.run_until_complete(f) + socks = svr.sockets + print("serving on", socks[0].getsockname()) + try: + loop.run_forever() + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py b/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py new file mode 100755 index 0000000000..ca49db7d8f --- /dev/null +++ b/third_party/python/aiohttp/examples/legacy/tcp_protocol_parser.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +"""Protocol parser example.""" +import argparse +import asyncio +import collections + +import aiohttp + +try: + import signal +except ImportError: + signal = None + + +MSG_TEXT = b"text:" +MSG_PING = b"ping:" +MSG_PONG = b"pong:" +MSG_STOP = b"stop:" + +Message = collections.namedtuple("Message", ("tp", "data")) + + +def my_protocol_parser(out, buf): + """Parser is used with StreamParser for incremental protocol parsing. + Parser is a generator function, but it is not a coroutine. Usually + parsers are implemented as a state machine. + + more details in asyncio/parsers.py + existing parsers: + * HTTP protocol parsers asyncio/http/protocol.py + * websocket parser asyncio/http/websocket.py + """ + while True: + tp = yield from buf.read(5) + if tp in (MSG_PING, MSG_PONG): + # skip line + yield from buf.skipuntil(b"\r\n") + out.feed_data(Message(tp, None)) + elif tp == MSG_STOP: + out.feed_data(Message(tp, None)) + elif tp == MSG_TEXT: + # read text + text = yield from buf.readuntil(b"\r\n") + out.feed_data(Message(tp, text.strip().decode("utf-8"))) + else: + raise ValueError("Unknown protocol prefix.") + + +class MyProtocolWriter: + def __init__(self, transport): + self.transport = transport + + def ping(self): + self.transport.write(b"ping:\r\n") + + def pong(self): + self.transport.write(b"pong:\r\n") + + def stop(self): + self.transport.write(b"stop:\r\n") + + def send_text(self, text): + self.transport.write(f"text:{text.strip()}\r\n".encode("utf-8")) + + +class EchoServer(asyncio.Protocol): + def connection_made(self, transport): + print("Connection made") + self.transport = transport + self.stream = aiohttp.StreamParser() + asyncio.Task(self.dispatch()) + + def data_received(self, data): + self.stream.feed_data(data) + + def eof_received(self): + self.stream.feed_eof() + + def connection_lost(self, exc): + print("Connection lost") + + async def dispatch(self): + reader = self.stream.set_parser(my_protocol_parser) + writer = MyProtocolWriter(self.transport) + + while True: + try: + msg = await reader.read() + except aiohttp.ConnectionError: + # client has been disconnected + break + + print(f"Message received: {msg}") + + if msg.type == MSG_PING: + writer.pong() + elif msg.type == MSG_TEXT: + writer.send_text("Re: " + msg.data) + elif msg.type == MSG_STOP: + self.transport.close() + break + + +async def start_client(loop, host, port): + transport, stream = await loop.create_connection(aiohttp.StreamProtocol, host, port) + reader = stream.reader.set_parser(my_protocol_parser) + writer = MyProtocolWriter(transport) + writer.ping() + + message = "This is the message. It will be echoed." + + while True: + try: + msg = await reader.read() + except aiohttp.ConnectionError: + print("Server has been disconnected.") + break + + print(f"Message received: {msg}") + if msg.type == MSG_PONG: + writer.send_text(message) + print("data sent:", message) + elif msg.type == MSG_TEXT: + writer.stop() + print("stop sent") + break + + transport.close() + + +def start_server(loop, host, port): + f = loop.create_server(EchoServer, host, port) + srv = loop.run_until_complete(f) + x = srv.sockets[0] + print("serving on", x.getsockname()) + loop.run_forever() + + +ARGS = argparse.ArgumentParser(description="Protocol parser example.") +ARGS.add_argument( + "--server", action="store_true", dest="server", default=False, help="Run tcp server" +) +ARGS.add_argument( + "--client", action="store_true", dest="client", default=False, help="Run tcp client" +) +ARGS.add_argument( + "--host", action="store", dest="host", default="127.0.0.1", help="Host name" +) +ARGS.add_argument( + "--port", action="store", dest="port", default=9999, type=int, help="Port number" +) + + +if __name__ == "__main__": + args = ARGS.parse_args() + + if ":" in args.host: + args.host, port = args.host.split(":", 1) + args.port = int(port) + + if (not (args.server or args.client)) or (args.server and args.client): + print("Please specify --server or --client\n") + ARGS.print_help() + else: + loop = asyncio.get_event_loop() + if signal is not None: + loop.add_signal_handler(signal.SIGINT, loop.stop) + + if args.server: + start_server(loop, args.host, args.port) + else: + loop.run_until_complete(start_client(loop, args.host, args.port)) |