summaryrefslogtreecommitdiffstats
path: root/test/test_http_proxy.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:10:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:10:23 +0000
commit96011c9a0b8a4f10e6e06f76acc380d6a802bbee (patch)
tree30ded50e296e5d936800d19ada594982f10111d6 /test/test_http_proxy.py
parentAdding debian version 2024.04.09-1. (diff)
downloadyt-dlp-96011c9a0b8a4f10e6e06f76acc380d6a802bbee.tar.xz
yt-dlp-96011c9a0b8a4f10e6e06f76acc380d6a802bbee.zip
Merging upstream version 2024.05.26.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/test_http_proxy.py')
-rw-r--r--test/test_http_proxy.py380
1 files changed, 380 insertions, 0 deletions
diff --git a/test/test_http_proxy.py b/test/test_http_proxy.py
new file mode 100644
index 0000000..1b21fe7
--- /dev/null
+++ b/test/test_http_proxy.py
@@ -0,0 +1,380 @@
+import abc
+import base64
+import contextlib
+import functools
+import json
+import os
+import random
+import ssl
+import threading
+from http.server import BaseHTTPRequestHandler
+from socketserver import ThreadingTCPServer
+
+import pytest
+
+from test.helper import http_server_port, verify_address_availability
+from test.test_networking import TEST_DIR
+from test.test_socks import IPv6ThreadingTCPServer
+from yt_dlp.dependencies import urllib3
+from yt_dlp.networking import Request
+from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
+
+
+class HTTPProxyAuthMixin:
+
+ def proxy_auth_error(self):
+ self.send_response(407)
+ self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
+ self.end_headers()
+ return False
+
+ def do_proxy_auth(self, username, password):
+ if username is None and password is None:
+ return True
+
+ proxy_auth_header = self.headers.get('Proxy-Authorization', None)
+ if proxy_auth_header is None:
+ return self.proxy_auth_error()
+
+ if not proxy_auth_header.startswith('Basic '):
+ return self.proxy_auth_error()
+
+ auth = proxy_auth_header[6:]
+
+ try:
+ auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
+ except Exception:
+ return self.proxy_auth_error()
+
+ if auth_username != (username or '') or auth_password != (password or ''):
+ return self.proxy_auth_error()
+ return True
+
+
+class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
+ def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
+ self.username = username
+ self.password = password
+ self.proxy_info = proxy_info
+ super().__init__(*args, **kwargs)
+
+ def do_GET(self):
+ if not self.do_proxy_auth(self.username, self.password):
+ self.server.close_request(self.request)
+ return
+ if self.path.endswith('/proxy_info'):
+ payload = json.dumps(self.proxy_info or {
+ 'client_address': self.client_address,
+ 'connect': False,
+ 'connect_host': None,
+ 'connect_port': None,
+ 'headers': dict(self.headers),
+ 'path': self.path,
+ 'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
+ })
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-Length', str(len(payload)))
+ self.end_headers()
+ self.wfile.write(payload.encode())
+ else:
+ self.send_response(404)
+ self.end_headers()
+
+ self.server.close_request(self.request)
+
+
+if urllib3:
+ import urllib3.util.ssltransport
+
+ class SSLTransport(urllib3.util.ssltransport.SSLTransport):
+ """
+ Modified version of urllib3 SSLTransport to support server side SSL
+
+ This allows us to chain multiple TLS connections.
+ """
+
+ def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
+ self.incoming = ssl.MemoryBIO()
+ self.outgoing = ssl.MemoryBIO()
+
+ self.suppress_ragged_eofs = suppress_ragged_eofs
+ self.socket = socket
+
+ self.sslobj = ssl_context.wrap_bio(
+ self.incoming,
+ self.outgoing,
+ server_hostname=server_hostname,
+ server_side=server_side
+ )
+ self._ssl_io_loop(self.sslobj.do_handshake)
+
+ @property
+ def _io_refs(self):
+ return self.socket._io_refs
+
+ @_io_refs.setter
+ def _io_refs(self, value):
+ self.socket._io_refs = value
+
+ def shutdown(self, *args, **kwargs):
+ self.socket.shutdown(*args, **kwargs)
+else:
+ SSLTransport = None
+
+
+class HTTPSProxyHandler(HTTPProxyHandler):
+ def __init__(self, request, *args, **kwargs):
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.load_cert_chain(certfn, None)
+ if isinstance(request, ssl.SSLSocket):
+ request = SSLTransport(request, ssl_context=sslctx, server_side=True)
+ else:
+ request = sslctx.wrap_socket(request, server_side=True)
+ super().__init__(request, *args, **kwargs)
+
+
+class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
+ protocol_version = 'HTTP/1.1'
+ default_request_version = 'HTTP/1.1'
+
+ def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
+ self.username = username
+ self.password = password
+ self.request_handler = request_handler
+ super().__init__(*args, **kwargs)
+
+ def do_CONNECT(self):
+ if not self.do_proxy_auth(self.username, self.password):
+ self.server.close_request(self.request)
+ return
+ self.send_response(200)
+ self.end_headers()
+ proxy_info = {
+ 'client_address': self.client_address,
+ 'connect': True,
+ 'connect_host': self.path.split(':')[0],
+ 'connect_port': int(self.path.split(':')[1]),
+ 'headers': dict(self.headers),
+ 'path': self.path,
+ 'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
+ }
+ self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
+ self.server.close_request(self.request)
+
+
+class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
+ def __init__(self, request, *args, **kwargs):
+ certfn = os.path.join(TEST_DIR, 'testcert.pem')
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ sslctx.load_cert_chain(certfn, None)
+ request = sslctx.wrap_socket(request, server_side=True)
+ self._original_request = request
+ super().__init__(request, *args, **kwargs)
+
+ def do_CONNECT(self):
+ super().do_CONNECT()
+ self.server.close_request(self._original_request)
+
+
+@contextlib.contextmanager
+def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
+ server = server_thread = None
+ try:
+ bind_address = bind_ip or '127.0.0.1'
+ server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
+ server = server_type(
+ (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
+ server_port = http_server_port(server)
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.daemon = True
+ server_thread.start()
+ if '.' not in bind_address:
+ yield f'[{bind_address}]:{server_port}'
+ else:
+ yield f'{bind_address}:{server_port}'
+ finally:
+ server.shutdown()
+ server.server_close()
+ server_thread.join(2.0)
+
+
+class HTTPProxyTestContext(abc.ABC):
+ REQUEST_HANDLER_CLASS = None
+ REQUEST_PROTO = None
+
+ def http_server(self, server_class, *args, **kwargs):
+ return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
+
+ @abc.abstractmethod
+ def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
+ """return a dict of proxy_info"""
+
+
+class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
+ # Standard HTTP Proxy for http requests
+ REQUEST_HANDLER_CLASS = HTTPProxyHandler
+ REQUEST_PROTO = 'http'
+
+ def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
+ request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
+ handler.validate(request)
+ return json.loads(handler.send(request).read().decode())
+
+
+class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
+ # HTTP Connect proxy, for https requests
+ REQUEST_HANDLER_CLASS = HTTPSProxyHandler
+ REQUEST_PROTO = 'https'
+
+ def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
+ request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
+ handler.validate(request)
+ return json.loads(handler.send(request).read().decode())
+
+
+CTX_MAP = {
+ 'http': HTTPProxyHTTPTestContext,
+ 'https': HTTPProxyHTTPSTestContext,
+}
+
+
+@pytest.fixture(scope='module')
+def ctx(request):
+ return CTX_MAP[request.param]()
+
+
+@pytest.mark.parametrize(
+ 'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
+@pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http
+class TestHTTPProxy:
+ def test_http_no_auth(self, handler, ctx):
+ with ctx.http_server(HTTPProxyHandler) as server_address:
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['connect'] is False
+ assert 'Proxy-Authorization' not in proxy_info['headers']
+
+ def test_http_auth(self, handler, ctx):
+ with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert 'Proxy-Authorization' in proxy_info['headers']
+
+ def test_http_bad_auth(self, handler, ctx):
+ with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
+ with pytest.raises(HTTPError) as exc_info:
+ ctx.proxy_info_request(rh)
+ assert exc_info.value.response.status == 407
+ exc_info.value.response.close()
+
+ def test_http_source_address(self, handler, ctx):
+ with ctx.http_server(HTTPProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ verify_address_availability(source_address)
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
+ source_address=source_address) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['client_address'][0] == source_address
+
+ @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
+ def test_https(self, handler, ctx):
+ with ctx.http_server(HTTPSProxyHandler) as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['connect'] is False
+ assert 'Proxy-Authorization' not in proxy_info['headers']
+
+ @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
+ def test_https_verify_failed(self, handler, ctx):
+ with ctx.http_server(HTTPSProxyHandler) as server_address:
+ with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
+ # Accept SSLError as may not be feasible to tell if it is proxy or request error.
+ # note: if request proto also does ssl verification, this may also be the error of the request.
+ # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
+ with pytest.raises((ProxyError, SSLError)):
+ ctx.proxy_info_request(rh)
+
+ def test_http_with_idn(self, handler, ctx):
+ with ctx.http_server(HTTPProxyHandler) as server_address:
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
+ assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'
+
+
+@pytest.mark.parametrize(
+ 'handler,ctx', [
+ ('Requests', 'https'),
+ ('CurlCFFI', 'https'),
+ ], indirect=True)
+class TestHTTPConnectProxy:
+ def test_http_connect_no_auth(self, handler, ctx):
+ with ctx.http_server(HTTPConnectProxyHandler) as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['connect'] is True
+ assert 'Proxy-Authorization' not in proxy_info['headers']
+
+ def test_http_connect_auth(self, handler, ctx):
+ with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert 'Proxy-Authorization' in proxy_info['headers']
+
+ @pytest.mark.skip_handler(
+ 'Requests',
+ 'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374'
+ )
+ def test_http_connect_bad_auth(self, handler, ctx):
+ with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
+ with pytest.raises(ProxyError):
+ ctx.proxy_info_request(rh)
+
+ def test_http_connect_source_address(self, handler, ctx):
+ with ctx.http_server(HTTPConnectProxyHandler) as server_address:
+ source_address = f'127.0.0.{random.randint(5, 255)}'
+ verify_address_availability(source_address)
+ with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
+ source_address=source_address,
+ verify=False) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['client_address'][0] == source_address
+
+ @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
+ def test_https_connect_proxy(self, handler, ctx):
+ with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert proxy_info['connect'] is True
+ assert 'Proxy-Authorization' not in proxy_info['headers']
+
+ @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
+ def test_https_connect_verify_failed(self, handler, ctx):
+ with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
+ with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
+ # Accept SSLError as may not be feasible to tell if it is proxy or request error.
+ # note: if request proto also does ssl verification, this may also be the error of the request.
+ # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
+ with pytest.raises((ProxyError, SSLError)):
+ ctx.proxy_info_request(rh)
+
+ @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
+ def test_https_connect_proxy_auth(self, handler, ctx):
+ with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
+ with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
+ proxy_info = ctx.proxy_info_request(rh)
+ assert proxy_info['proxy'] == server_address
+ assert 'Proxy-Authorization' in proxy_info['headers']