summaryrefslogtreecommitdiffstats
path: root/third_party/python/urllib3/dummyserver/handlers.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/python/urllib3/dummyserver/handlers.py328
1 files changed, 328 insertions, 0 deletions
diff --git a/third_party/python/urllib3/dummyserver/handlers.py b/third_party/python/urllib3/dummyserver/handlers.py
new file mode 100644
index 0000000000..696dbab076
--- /dev/null
+++ b/third_party/python/urllib3/dummyserver/handlers.py
@@ -0,0 +1,328 @@
+from __future__ import print_function
+
+import collections
+import contextlib
+import gzip
+import json
+import logging
+import sys
+import time
+import zlib
+
+from io import BytesIO
+from tornado.web import RequestHandler
+from tornado import httputil
+from datetime import datetime
+from datetime import timedelta
+
+from urllib3.packages.six.moves.http_client import responses
+from urllib3.packages.six.moves.urllib.parse import urlsplit
+from urllib3.packages.six import binary_type, ensure_str
+
+log = logging.getLogger(__name__)
+
+
+class Response(object):
+ def __init__(self, body="", status="200 OK", headers=None):
+ self.body = body
+ self.status = status
+ self.headers = headers or [("Content-type", "text/plain")]
+
+ def __call__(self, request_handler):
+ status, reason = self.status.split(" ", 1)
+ request_handler.set_status(int(status), reason)
+ for header, value in self.headers:
+ request_handler.add_header(header, value)
+
+ # chunked
+ if isinstance(self.body, list):
+ for item in self.body:
+ if not isinstance(item, bytes):
+ item = item.encode("utf8")
+ request_handler.write(item)
+ request_handler.flush()
+ else:
+ body = self.body
+ if not isinstance(body, bytes):
+ body = body.encode("utf8")
+
+ request_handler.write(body)
+
+
+RETRY_TEST_NAMES = collections.defaultdict(int)
+
+
+class TestingApp(RequestHandler):
+ """
+ Simple app that performs various operations, useful for testing an HTTP
+ library.
+
+ Given any path, it will attempt to load a corresponding local method if
+ it exists. Status code 200 indicates success, 400 indicates failure. Each
+ method has its own conditions for success/failure.
+ """
+
+ def get(self):
+ """ Handle GET requests """
+ self._call_method()
+
+ def post(self):
+ """ Handle POST requests """
+ self._call_method()
+
+ def put(self):
+ """ Handle PUT requests """
+ self._call_method()
+
+ def options(self):
+ """ Handle OPTIONS requests """
+ self._call_method()
+
+ def head(self):
+ """ Handle HEAD requests """
+ self._call_method()
+
+ def _call_method(self):
+ """ Call the correct method in this class based on the incoming URI """
+ req = self.request
+ req.params = {}
+ for k, v in req.arguments.items():
+ req.params[k] = next(iter(v))
+
+ path = req.path[:]
+ if not path.startswith("/"):
+ path = urlsplit(path).path
+
+ target = path[1:].split("/", 1)[0]
+ method = getattr(self, target, self.index)
+
+ resp = method(req)
+
+ if dict(resp.headers).get("Connection") == "close":
+ # FIXME: Can we kill the connection somehow?
+ pass
+
+ resp(self)
+
+ def index(self, _request):
+ "Render simple message"
+ return Response("Dummy server!")
+
+ def certificate(self, request):
+ """Return the requester's certificate."""
+ cert = request.get_ssl_certificate()
+ subject = dict()
+ if cert is not None:
+ subject = dict((k, v) for (k, v) in [y for z in cert["subject"] for y in z])
+ return Response(json.dumps(subject))
+
+ def source_address(self, request):
+ """Return the requester's IP address."""
+ return Response(request.remote_ip)
+
+ def set_up(self, request):
+ test_type = request.params.get("test_type")
+ test_id = request.params.get("test_id")
+ if test_id:
+ print("\nNew test %s: %s" % (test_type, test_id))
+ else:
+ print("\nNew test %s" % test_type)
+ return Response("Dummy server is ready!")
+
+ def specific_method(self, request):
+ "Confirm that the request matches the desired method type"
+ method = request.params.get("method")
+ if method and not isinstance(method, str):
+ method = method.decode("utf8")
+
+ if request.method != method:
+ return Response(
+ "Wrong method: %s != %s" % (method, request.method),
+ status="400 Bad Request",
+ )
+ return Response()
+
+ def upload(self, request):
+ "Confirm that the uploaded file conforms to specification"
+ # FIXME: This is a huge broken mess
+ param = request.params.get("upload_param", b"myfile").decode("ascii")
+ filename = request.params.get("upload_filename", b"").decode("utf-8")
+ size = int(request.params.get("upload_size", "0"))
+ files_ = request.files.get(param)
+
+ if len(files_) != 1:
+ return Response(
+ "Expected 1 file for '%s', not %d" % (param, len(files_)),
+ status="400 Bad Request",
+ )
+ file_ = files_[0]
+
+ data = file_["body"]
+ if int(size) != len(data):
+ return Response(
+ "Wrong size: %d != %d" % (size, len(data)), status="400 Bad Request"
+ )
+
+ got_filename = file_["filename"]
+ if isinstance(got_filename, binary_type):
+ got_filename = got_filename.decode("utf-8")
+
+ # Tornado can leave the trailing \n in place on the filename.
+ if filename != got_filename:
+ return Response(
+ u"Wrong filename: %s != %s" % (filename, file_.filename),
+ status="400 Bad Request",
+ )
+
+ return Response()
+
+ def redirect(self, request):
+ "Perform a redirect to ``target``"
+ target = request.params.get("target", "/")
+ status = request.params.get("status", "303 See Other")
+ if len(status) == 3:
+ status = "%s Redirect" % status.decode("latin-1")
+
+ headers = [("Location", target)]
+ return Response(status=status, headers=headers)
+
+ def not_found(self, request):
+ return Response("Not found", status="404 Not Found")
+
+ def multi_redirect(self, request):
+ "Performs a redirect chain based on ``redirect_codes``"
+ codes = request.params.get("redirect_codes", b"200").decode("utf-8")
+ head, tail = codes.split(",", 1) if "," in codes else (codes, None)
+ status = "{0} {1}".format(head, responses[int(head)])
+ if not tail:
+ return Response("Done redirecting", status=status)
+
+ headers = [("Location", "/multi_redirect?redirect_codes=%s" % tail)]
+ return Response(status=status, headers=headers)
+
+ def keepalive(self, request):
+ if request.params.get("close", b"0") == b"1":
+ headers = [("Connection", "close")]
+ return Response("Closing", headers=headers)
+
+ headers = [("Connection", "keep-alive")]
+ return Response("Keeping alive", headers=headers)
+
+ def echo_params(self, request):
+ params = sorted(
+ [(ensure_str(k), ensure_str(v)) for k, v in request.params.items()]
+ )
+ return Response(repr(params))
+
+ def sleep(self, request):
+ "Sleep for a specified amount of ``seconds``"
+ # DO NOT USE THIS, IT'S DEPRECATED.
+ # FIXME: Delete this once appengine tests are fixed to not use this handler.
+ seconds = float(request.params.get("seconds", "1"))
+ time.sleep(seconds)
+ return Response()
+
+ def echo(self, request):
+ "Echo back the params"
+ if request.method == "GET":
+ return Response(request.query)
+
+ return Response(request.body)
+
+ def echo_uri(self, request):
+ "Echo back the requested URI"
+ return Response(request.uri)
+
+ def encodingrequest(self, request):
+ "Check for UA accepting gzip/deflate encoding"
+ data = b"hello, world!"
+ encoding = request.headers.get("Accept-Encoding", "")
+ headers = None
+ if encoding == "gzip":
+ headers = [("Content-Encoding", "gzip")]
+ file_ = BytesIO()
+ with contextlib.closing(
+ gzip.GzipFile("", mode="w", fileobj=file_)
+ ) as zipfile:
+ zipfile.write(data)
+ data = file_.getvalue()
+ elif encoding == "deflate":
+ headers = [("Content-Encoding", "deflate")]
+ data = zlib.compress(data)
+ elif encoding == "garbage-gzip":
+ headers = [("Content-Encoding", "gzip")]
+ data = "garbage"
+ elif encoding == "garbage-deflate":
+ headers = [("Content-Encoding", "deflate")]
+ data = "garbage"
+ return Response(data, headers=headers)
+
+ def headers(self, request):
+ return Response(json.dumps(dict(request.headers)))
+
+ def successful_retry(self, request):
+ """ Handler which will return an error and then success
+
+ It's not currently very flexible as the number of retries is hard-coded.
+ """
+ test_name = request.headers.get("test-name", None)
+ if not test_name:
+ return Response("test-name header not set", status="400 Bad Request")
+
+ RETRY_TEST_NAMES[test_name] += 1
+
+ if RETRY_TEST_NAMES[test_name] >= 2:
+ return Response("Retry successful!")
+ else:
+ return Response("need to keep retrying!", status="418 I'm A Teapot")
+
+ def chunked(self, request):
+ return Response(["123"] * 4)
+
+ def chunked_gzip(self, request):
+ chunks = []
+ compressor = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+
+ for uncompressed in [b"123"] * 4:
+ chunks.append(compressor.compress(uncompressed))
+
+ chunks.append(compressor.flush())
+
+ return Response(chunks, headers=[("Content-Encoding", "gzip")])
+
+ def nbytes(self, request):
+ length = int(request.params.get("length"))
+ data = b"1" * length
+ return Response(data, headers=[("Content-Type", "application/octet-stream")])
+
+ def status(self, request):
+ status = request.params.get("status", "200 OK")
+
+ return Response(status=status)
+
+ def retry_after(self, request):
+ if datetime.now() - self.application.last_req < timedelta(seconds=1):
+ status = request.params.get("status", b"429 Too Many Requests")
+ return Response(
+ status=status.decode("utf-8"), headers=[("Retry-After", "1")]
+ )
+
+ self.application.last_req = datetime.now()
+
+ return Response(status="200 OK")
+
+ def redirect_after(self, request):
+ "Perform a redirect to ``target``"
+ date = request.params.get("date")
+ if date:
+ retry_after = str(
+ httputil.format_timestamp(datetime.fromtimestamp(float(date)))
+ )
+ else:
+ retry_after = "1"
+ target = request.params.get("target", "/")
+ headers = [("Location", target), ("Retry-After", retry_after)]
+ return Response(status="303 See Other", headers=headers)
+
+ def shutdown(self, request):
+ sys.exit()