diff options
Diffstat (limited to 'tests/test_builders/test_build_linkcheck.py')
-rw-r--r-- | tests/test_builders/test_build_linkcheck.py | 1040 |
1 files changed, 1040 insertions, 0 deletions
diff --git a/tests/test_builders/test_build_linkcheck.py b/tests/test_builders/test_build_linkcheck.py new file mode 100644 index 0000000..c8d8515 --- /dev/null +++ b/tests/test_builders/test_build_linkcheck.py @@ -0,0 +1,1040 @@ +"""Test the build process with manpage builder with the test root.""" + +from __future__ import annotations + +import json +import re +import sys +import textwrap +import time +import wsgiref.handlers +from base64 import b64encode +from http.server import BaseHTTPRequestHandler +from queue import Queue +from unittest import mock + +import docutils +import pytest +from urllib3.poolmanager import PoolManager + +import sphinx.util.http_date +from sphinx.builders.linkcheck import ( + CheckRequest, + Hyperlink, + HyperlinkAvailabilityCheckWorker, + RateLimit, + compile_linkcheck_allowed_redirects, +) +from sphinx.deprecation import RemovedInSphinx80Warning +from sphinx.util import requests +from sphinx.util.console import strip_colors + +from tests.utils import CERT_FILE, serve_application + +ts_re = re.compile(r".*\[(?P<ts>.*)\].*") + + +class DefaultsHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + if self.path[1:].rstrip() in {"", "anchor.html"}: + self.send_response(200, "OK") + self.send_header("Content-Length", "0") + self.end_headers() + else: + self.send_response(404, "Not Found") + self.send_header("Content-Length", "0") + self.end_headers() + + def do_GET(self): + if self.path[1:].rstrip() == "": + content = b"ok\n\n" + elif self.path[1:].rstrip() == "anchor.html": + doc = '<!DOCTYPE html><html><body><a id="found"></a></body></html>' + content = doc.encode("utf-8") + else: + content = b"" + + if content: + self.send_response(200, "OK") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + else: + self.send_response(404, "Not Found") + self.send_header("Content-Length", "0") + self.end_headers() + + +class ConnectionMeasurement: + """Measure the number of distinct host connections created during linkchecking""" + + def __init__(self): + self.connections = set() + self.urllib3_connection_from_url = PoolManager.connection_from_url + self.patcher = mock.patch.object( + target=PoolManager, + attribute='connection_from_url', + new=self._collect_connections(), + ) + + def _collect_connections(self): + def connection_collector(obj, url): + connection = self.urllib3_connection_from_url(obj, url) + self.connections.add(connection) + return connection + return connection_collector + + def __enter__(self): + self.patcher.start() + return self + + def __exit__(self, *args, **kwargs): + for connection in self.connections: + connection.close() + self.patcher.stop() + + @property + def connection_count(self): + return len(self.connections) + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck', freshenv=True) +def test_defaults(app): + with serve_application(app, DefaultsHandler) as address: + with ConnectionMeasurement() as m: + app.build() + assert m.connection_count <= 5 + + # Text output + assert (app.outdir / 'output.txt').exists() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + + # looking for '#top' and '#does-not-exist' not found should fail + assert "Anchor 'top' not found" in content + assert "Anchor 'does-not-exist' not found" in content + # images should fail + assert f"Not Found for url: http://{address}/image.png" in content + assert f"Not Found for url: http://{address}/image2.png" in content + # looking for missing local file should fail + assert "[broken] path/to/notfound" in content + assert len(content.splitlines()) == 5 + + # JSON output + assert (app.outdir / 'output.json').exists() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + + rows = [json.loads(x) for x in content.splitlines()] + row = rows[0] + for attr in ("filename", "lineno", "status", "code", "uri", "info"): + assert attr in row + + assert len(content.splitlines()) == 10 + assert len(rows) == 10 + # the output order of the rows is not stable + # due to possible variance in network latency + rowsby = {row["uri"]: row for row in rows} + # looking for local file that exists should succeed + assert rowsby["conf.py"]["status"] == "working" + assert rowsby[f"http://{address}#!bar"] == { + 'filename': 'links.rst', + 'lineno': 5, + 'status': 'working', + 'code': 0, + 'uri': f'http://{address}#!bar', + 'info': '', + } + + def _missing_resource(filename: str, lineno: int): + return { + 'filename': 'links.rst', + 'lineno': lineno, + 'status': 'broken', + 'code': 0, + 'uri': f'http://{address}/{filename}', + 'info': f'404 Client Error: Not Found for url: http://{address}/{filename}', + } + accurate_linenumbers = docutils.__version_info__[:2] >= (0, 21) + image2_lineno = 12 if accurate_linenumbers else 13 + assert rowsby[f'http://{address}/image2.png'] == _missing_resource("image2.png", image2_lineno) + # looking for '#top' and '#does-not-exist' not found should fail + assert rowsby[f"http://{address}/#top"]["info"] == "Anchor 'top' not found" + assert rowsby[f"http://{address}/#top"]["status"] == "broken" + assert rowsby[f"http://{address}#does-not-exist"]["info"] == "Anchor 'does-not-exist' not found" + # images should fail + assert f"Not Found for url: http://{address}/image.png" in rowsby[f"http://{address}/image.png"]["info"] + # anchor should be found + assert rowsby[f'http://{address}/anchor.html#found'] == { + 'filename': 'links.rst', + 'lineno': 14, + 'status': 'working', + 'code': 0, + 'uri': f'http://{address}/anchor.html#found', + 'info': '', + } + + +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck', freshenv=True, + confoverrides={'linkcheck_anchors': False}) +def test_check_link_response_only(app): + with serve_application(app, DefaultsHandler) as address: + app.build() + + # JSON output + assert (app.outdir / 'output.json').exists() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + + rows = [json.loads(x) for x in content.splitlines()] + rowsby = {row["uri"]: row for row in rows} + assert rowsby[f"http://{address}/#top"]["status"] == "working" + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-too-many-retries', freshenv=True) +def test_too_many_retries(app): + with serve_application(app, DefaultsHandler) as address: + app.build() + + # Text output + assert (app.outdir / 'output.txt').exists() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + + # looking for non-existent URL should fail + assert " Max retries exceeded with url: /doesnotexist" in content + + # JSON output + assert (app.outdir / 'output.json').exists() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + + assert len(content.splitlines()) == 1 + row = json.loads(content) + # the output order of the rows is not stable + # due to possible variance in network latency + + # looking for non-existent URL should fail + assert row['filename'] == 'index.rst' + assert row['lineno'] == 1 + assert row['status'] == 'broken' + assert row['code'] == 0 + assert row['uri'] == f'https://{address}/doesnotexist' + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-raw-node', freshenv=True) +def test_raw_node(app): + with serve_application(app, OKHandler) as address: + # write an index file that contains a link back to this webserver's root + # URL. docutils will replace the raw node with the contents retrieved.. + # ..and then the linkchecker will check that the root URL is available. + index = (app.srcdir / "index.rst") + index.write_text( + ".. raw:: 'html'\n" + " :url: http://{address}/".format(address=address), + ) + app.build() + + # JSON output + assert (app.outdir / 'output.json').exists() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + + assert len(content.splitlines()) == 1 + row = json.loads(content) + + # raw nodes' url should be checked too + assert row == { + 'filename': 'index.rst', + 'lineno': 1, + 'status': 'working', + 'code': 0, + 'uri': f'http://{address}/', # the received rST contains a link to its' own URL + 'info': '', + } + + +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck-anchors-ignore', freshenv=True, + confoverrides={'linkcheck_anchors_ignore': ["^!", "^top$"]}) +def test_anchors_ignored(app): + with serve_application(app, OKHandler): + app.build() + + assert (app.outdir / 'output.txt').exists() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + + # expect all ok when excluding #top + assert not content + + +class AnchorsIgnoreForUrlHandler(BaseHTTPRequestHandler): + def do_HEAD(self): + if self.path in {'/valid', '/ignored'}: + self.send_response(200, "OK") + else: + self.send_response(404, "Not Found") + self.end_headers() + + def do_GET(self): + self.do_HEAD() + if self.path == '/valid': + self.wfile.write(b"<h1 id='valid-anchor'>valid anchor</h1>\n") + elif self.path == '/ignored': + self.wfile.write(b"no anchor but page exists\n") + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-anchors-ignore-for-url', freshenv=True) +def test_anchors_ignored_for_url(app): + with serve_application(app, AnchorsIgnoreForUrlHandler) as address: + app.config.linkcheck_anchors_ignore_for_url = [ # type: ignore[attr-defined] + f'http://{address}/ignored', # existing page + f'http://{address}/invalid', # unknown page + ] + app.build() + + assert (app.outdir / 'output.txt').exists() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + + attrs = ('filename', 'lineno', 'status', 'code', 'uri', 'info') + data = [json.loads(x) for x in content.splitlines()] + assert len(data) == 7 + assert all(all(attr in row for attr in attrs) for row in data) + + # rows may be unsorted due to network latency or + # the order the threads are processing the links + rows = {r['uri']: {'status': r['status'], 'info': r['info']} for r in data} + + assert rows[f'http://{address}/valid']['status'] == 'working' + assert rows[f'http://{address}/valid#valid-anchor']['status'] == 'working' + assert rows[f'http://{address}/valid#invalid-anchor'] == { + 'status': 'broken', + 'info': "Anchor 'invalid-anchor' not found", + } + + assert rows[f'http://{address}/ignored']['status'] == 'working' + assert rows[f'http://{address}/ignored#invalid-anchor']['status'] == 'working' + + assert rows[f'http://{address}/invalid'] == { + 'status': 'broken', + 'info': f'404 Client Error: Not Found for url: http://{address}/invalid', + } + assert rows[f'http://{address}/invalid#anchor'] == { + 'status': 'broken', + 'info': f'404 Client Error: Not Found for url: http://{address}/invalid', + } + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-anchor', freshenv=True) +def test_raises_for_invalid_status(app): + class InternalServerErrorHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_GET(self): + self.send_error(500, "Internal Server Error") + + with serve_application(app, InternalServerErrorHandler) as address: + app.build() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + assert content == ( + f"index.rst:1: [broken] http://{address}/#anchor: " + "500 Server Error: Internal Server Error " + f"for url: http://{address}/\n" + ) + + +def custom_handler(valid_credentials=(), success_criteria=lambda _: True): + """ + Returns an HTTP request handler that authenticates the client and then determines + an appropriate HTTP response code, based on caller-provided credentials and optional + success criteria, respectively. + """ + expected_token = None + if valid_credentials: + assert len(valid_credentials) == 2, "expected a pair of strings as credentials" + expected_token = b64encode(":".join(valid_credentials).encode()).decode("utf-8") + del valid_credentials + + class CustomHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def authenticated(method): + def method_if_authenticated(self): + if expected_token is None: + return method(self) + elif not self.headers["Authorization"]: + self.send_response(401, "Unauthorized") + self.end_headers() + elif self.headers["Authorization"] == f"Basic {expected_token}": + return method(self) + else: + self.send_response(403, "Forbidden") + self.send_header("Content-Length", "0") + self.end_headers() + + return method_if_authenticated + + @authenticated + def do_HEAD(self): + self.do_GET() + + @authenticated + def do_GET(self): + if success_criteria(self): + self.send_response(200, "OK") + self.send_header("Content-Length", "0") + else: + self.send_response(400, "Bad Request") + self.send_header("Content-Length", "0") + self.end_headers() + + return CustomHandler + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_auth_header_uses_first_match(app): + with serve_application(app, custom_handler(valid_credentials=("user1", "password"))) as address: + app.config.linkcheck_auth = [ # type: ignore[attr-defined] + (r'^$', ('no', 'match')), + (fr'^http://{re.escape(address)}/$', ('user1', 'password')), + (r'.*local.*', ('user2', 'hunter2')), + ] + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" + + +@pytest.mark.filterwarnings('ignore::sphinx.deprecation.RemovedInSphinx80Warning') +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck-localserver', freshenv=True, + confoverrides={'linkcheck_allow_unauthorized': False}) +def test_unauthorized_broken(app): + with serve_application(app, custom_handler(valid_credentials=("user1", "password"))): + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["info"] == "unauthorized" + assert content["status"] == "broken" + + +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck-localserver', freshenv=True, + confoverrides={'linkcheck_auth': [(r'^$', ('user1', 'password'))]}) +def test_auth_header_no_match(app): + with ( + serve_application(app, custom_handler(valid_credentials=("user1", "password"))), + pytest.warns(RemovedInSphinx80Warning, match='linkcheck builder encountered an HTTP 401'), + ): + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + # This link is considered working based on the default linkcheck_allow_unauthorized=true + assert content["info"] == "unauthorized" + assert content["status"] == "working" + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_linkcheck_request_headers(app): + def check_headers(self): + if "X-Secret" in self.headers: + return False + return self.headers["Accept"] == "text/html" + + with serve_application(app, custom_handler(success_criteria=check_headers)) as address: + app.config.linkcheck_request_headers = { # type: ignore[attr-defined] + f"http://{address}/": {"Accept": "text/html"}, + "*": {"X-Secret": "open sesami"}, + } + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_linkcheck_request_headers_no_slash(app): + def check_headers(self): + if "X-Secret" in self.headers: + return False + return self.headers["Accept"] == "application/json" + + with serve_application(app, custom_handler(success_criteria=check_headers)) as address: + app.config.linkcheck_request_headers = { # type: ignore[attr-defined] + f"http://{address}": {"Accept": "application/json"}, + "*": {"X-Secret": "open sesami"}, + } + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" + + +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck-localserver', freshenv=True, + confoverrides={'linkcheck_request_headers': { + "http://do.not.match.org": {"Accept": "application/json"}, + "*": {"X-Secret": "open sesami"}, + }}) +def test_linkcheck_request_headers_default(app): + def check_headers(self): + if self.headers["X-Secret"] != "open sesami": + return False + return self.headers["Accept"] != "application/json" + + with serve_application(app, custom_handler(success_criteria=check_headers)): + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "working" + + +def make_redirect_handler(*, support_head): + class RedirectOnceHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + if support_head: + self.do_GET() + else: + self.send_response(405, "Method Not Allowed") + self.send_header("Content-Length", "0") + self.end_headers() + + def do_GET(self): + if self.path == "/?redirected=1": + self.send_response(204, "No content") + else: + self.send_response(302, "Found") + self.send_header("Location", "/?redirected=1") + self.send_header("Content-Length", "0") + self.end_headers() + + def log_date_time_string(self): + """Strip date and time from logged messages for assertions.""" + return "" + + return RedirectOnceHandler + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_follows_redirects_on_HEAD(app, capsys, warning): + with serve_application(app, make_redirect_handler(support_head=True)) as address: + app.build() + stdout, stderr = capsys.readouterr() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + assert content == ( + "index.rst:1: [redirected with Found] " + f"http://{address}/ to http://{address}/?redirected=1\n" + ) + assert stderr == textwrap.dedent( + """\ + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 302 - + 127.0.0.1 - - [] "HEAD /?redirected=1 HTTP/1.1" 204 - + """, + ) + assert warning.getvalue() == '' + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_follows_redirects_on_GET(app, capsys, warning): + with serve_application(app, make_redirect_handler(support_head=False)) as address: + app.build() + stdout, stderr = capsys.readouterr() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + assert content == ( + "index.rst:1: [redirected with Found] " + f"http://{address}/ to http://{address}/?redirected=1\n" + ) + assert stderr == textwrap.dedent( + """\ + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 405 - + 127.0.0.1 - - [] "GET / HTTP/1.1" 302 - + 127.0.0.1 - - [] "GET /?redirected=1 HTTP/1.1" 204 - + """, + ) + assert warning.getvalue() == '' + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-warn-redirects') +def test_linkcheck_allowed_redirects(app, warning): + with serve_application(app, make_redirect_handler(support_head=False)) as address: + app.config.linkcheck_allowed_redirects = {f'http://{address}/.*1': '.*'} # type: ignore[attr-defined] + compile_linkcheck_allowed_redirects(app, app.config) + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + rows = [json.loads(l) for l in fp] + + assert len(rows) == 2 + records = {row["uri"]: row for row in rows} + assert records[f"http://{address}/path1"]["status"] == "working" + assert records[f"http://{address}/path2"] == { + 'filename': 'index.rst', + 'lineno': 3, + 'status': 'redirected', + 'code': 302, + 'uri': f'http://{address}/path2', + 'info': f'http://{address}/?redirected=1', + } + + assert (f"index.rst:3: WARNING: redirect http://{address}/path2 - with Found to " + f"http://{address}/?redirected=1\n" in strip_colors(warning.getvalue())) + assert len(warning.getvalue().splitlines()) == 1 + + +class OKHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + self.send_response(200, "OK") + self.send_header("Content-Length", "0") + self.end_headers() + + def do_GET(self): + content = b"ok\n" + self.send_response(200, "OK") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + + +@mock.patch("sphinx.builders.linkcheck.requests.get", wraps=requests.get) +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_invalid_ssl(get_request, app): + # Link indicates SSL should be used (https) but the server does not handle it. + with serve_application(app, OKHandler) as address: + app.build() + assert not get_request.called + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content["status"] == "broken" + assert content["filename"] == "index.rst" + assert content["lineno"] == 1 + assert content["uri"] == f"https://{address}/" + assert "SSLError" in content["info"] + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_connect_to_selfsigned_fails(app): + with serve_application(app, OKHandler, tls_enabled=True) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content["status"] == "broken" + assert content["filename"] == "index.rst" + assert content["lineno"] == 1 + assert content["uri"] == f"https://{address}/" + assert "[SSL: CERTIFICATE_VERIFY_FAILED]" in content["info"] + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_connect_to_selfsigned_with_tls_verify_false(app): + app.config.tls_verify = False + with serve_application(app, OKHandler, tls_enabled=True) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content == { + "code": 0, + "status": "working", + "filename": "index.rst", + "lineno": 1, + "uri": f'https://{address}/', + "info": "", + } + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_connect_to_selfsigned_with_tls_cacerts(app): + app.config.tls_cacerts = CERT_FILE + with serve_application(app, OKHandler, tls_enabled=True) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content == { + "code": 0, + "status": "working", + "filename": "index.rst", + "lineno": 1, + "uri": f'https://{address}/', + "info": "", + } + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_connect_to_selfsigned_with_requests_env_var(monkeypatch, app): + monkeypatch.setenv("REQUESTS_CA_BUNDLE", CERT_FILE) + with serve_application(app, OKHandler, tls_enabled=True) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content == { + "code": 0, + "status": "working", + "filename": "index.rst", + "lineno": 1, + "uri": f'https://{address}/', + "info": "", + } + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver-https', freshenv=True) +def test_connect_to_selfsigned_nonexistent_cert_file(app): + app.config.tls_cacerts = "does/not/exist" + with serve_application(app, OKHandler, tls_enabled=True) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content == { + "code": 0, + "status": "broken", + "filename": "index.rst", + "lineno": 1, + "uri": f'https://{address}/', + "info": "Could not find a suitable TLS CA certificate bundle, invalid path: does/not/exist", + } + + +class InfiniteRedirectOnHeadHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + self.send_response(302, "Found") + self.send_header("Location", "/") + self.send_header("Content-Length", "0") + self.end_headers() + + def do_GET(self): + content = b"ok\n" + self.send_response(200, "OK") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + self.close_connection = True # we don't expect the client to read this response body + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_TooManyRedirects_on_HEAD(app, monkeypatch): + import requests.sessions + + monkeypatch.setattr(requests.sessions, "DEFAULT_REDIRECT_LIMIT", 5) + + with serve_application(app, InfiniteRedirectOnHeadHandler) as address: + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = json.load(fp) + assert content == { + "code": 0, + "status": "working", + "filename": "index.rst", + "lineno": 1, + "uri": f'http://{address}/', + "info": "", + } + + +def make_retry_after_handler(responses): + class RetryAfterHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + status, retry_after = responses.pop(0) + self.send_response(status) + if retry_after: + self.send_header('Retry-After', retry_after) + self.send_header("Content-Length", "0") + self.end_headers() + + def log_date_time_string(self): + """Strip date and time from logged messages for assertions.""" + return "" + + return RetryAfterHandler + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_too_many_requests_retry_after_int_delay(app, capsys, status): + with ( + serve_application(app, make_retry_after_handler([(429, "0"), (200, None)])) as address, + mock.patch("sphinx.builders.linkcheck.DEFAULT_DELAY", 0), + mock.patch("sphinx.builders.linkcheck.QUEUE_POLL_SECS", 0.01), + ): + app.build() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + assert json.loads(content) == { + "filename": "index.rst", + "lineno": 1, + "status": "working", + "code": 0, + "uri": f'http://{address}/', + "info": "", + } + rate_limit_log = f"-rate limited- http://{address}/ | sleeping...\n" + assert rate_limit_log in strip_colors(status.getvalue()) + _stdout, stderr = capsys.readouterr() + assert stderr == textwrap.dedent( + """\ + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 429 - + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 200 - + """, + ) + + +@pytest.mark.parametrize('tz', [None, 'GMT', 'GMT+3', 'GMT-3']) +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_too_many_requests_retry_after_HTTP_date(tz, app, monkeypatch, capsys): + retry_after = wsgiref.handlers.format_date_time(time.time()) + + with monkeypatch.context() as m: + if tz is not None: + m.setenv('TZ', tz) + if sys.platform != "win32": + time.tzset() + m.setattr(sphinx.util.http_date, '_GMT_OFFSET', + float(time.localtime().tm_gmtoff)) + + with serve_application(app, make_retry_after_handler([(429, retry_after), (200, None)])) as address: + app.build() + + content = (app.outdir / 'output.json').read_text(encoding='utf8') + assert json.loads(content) == { + "filename": "index.rst", + "lineno": 1, + "status": "working", + "code": 0, + "uri": f'http://{address}/', + "info": "", + } + _stdout, stderr = capsys.readouterr() + assert stderr == textwrap.dedent( + """\ + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 429 - + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 200 - + """, + ) + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_too_many_requests_retry_after_without_header(app, capsys): + with ( + serve_application(app, make_retry_after_handler([(429, None), (200, None)])) as address, + mock.patch("sphinx.builders.linkcheck.DEFAULT_DELAY", 0), + ): + app.build() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + assert json.loads(content) == { + "filename": "index.rst", + "lineno": 1, + "status": "working", + "code": 0, + "uri": f'http://{address}/', + "info": "", + } + _stdout, stderr = capsys.readouterr() + assert stderr == textwrap.dedent( + """\ + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 429 - + 127.0.0.1 - - [] "HEAD / HTTP/1.1" 200 - + """, + ) + + +@pytest.mark.sphinx( + 'linkcheck', testroot='linkcheck-localserver', freshenv=True, + confoverrides={ + 'linkcheck_report_timeouts_as_broken': False, + 'linkcheck_timeout': 0.01, + } +) +def test_requests_timeout(app): + class DelayedResponseHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_GET(self): + time.sleep(0.2) # wait before sending any response data + self.send_response(200, "OK") + self.send_header("Content-Length", "0") + self.end_headers() + + with serve_application(app, DelayedResponseHandler): + app.build() + + with open(app.outdir / "output.json", encoding="utf-8") as fp: + content = json.load(fp) + + assert content["status"] == "timeout" + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_too_many_requests_user_timeout(app): + app.config.linkcheck_rate_limit_timeout = 0.0 + with serve_application(app, make_retry_after_handler([(429, None)])) as address: + app.build() + content = (app.outdir / 'output.json').read_text(encoding='utf8') + assert json.loads(content) == { + "filename": "index.rst", + "lineno": 1, + "status": "broken", + "code": 0, + "uri": f'http://{address}/', + "info": f"429 Client Error: Too Many Requests for url: http://{address}/", + } + + +class FakeResponse: + headers: dict[str, str] = {} + url = "http://localhost/" + + +def test_limit_rate_default_sleep(app): + worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), {}) + with mock.patch('time.time', return_value=0.0): + next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After")) + assert next_check == 60.0 + + +def test_limit_rate_user_max_delay(app): + app.config.linkcheck_rate_limit_timeout = 0.0 + worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), {}) + next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After")) + assert next_check is None + + +def test_limit_rate_doubles_previous_wait_time(app): + rate_limits = {"localhost": RateLimit(60.0, 0.0)} + worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits) + with mock.patch('time.time', return_value=0.0): + next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After")) + assert next_check == 120.0 + + +def test_limit_rate_clips_wait_time_to_max_time(app): + app.config.linkcheck_rate_limit_timeout = 90.0 + rate_limits = {"localhost": RateLimit(60.0, 0.0)} + worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits) + with mock.patch('time.time', return_value=0.0): + next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After")) + assert next_check == 90.0 + + +def test_limit_rate_bails_out_after_waiting_max_time(app): + app.config.linkcheck_rate_limit_timeout = 90.0 + rate_limits = {"localhost": RateLimit(90.0, 0.0)} + worker = HyperlinkAvailabilityCheckWorker(app.config, Queue(), Queue(), rate_limits) + next_check = worker.limit_rate(FakeResponse.url, FakeResponse.headers.get("Retry-After")) + assert next_check is None + + +@mock.patch('sphinx.util.requests.requests.Session.get_adapter') +def test_connection_contention(get_adapter, app, capsys): + # Create a shared, but limited-size, connection pool + import requests + get_adapter.return_value = requests.adapters.HTTPAdapter(pool_maxsize=1) + + # Set an upper-bound on socket timeouts globally + import socket + socket.setdefaulttimeout(5) + + # Create parallel consumer threads + with serve_application(app, make_redirect_handler(support_head=True)) as address: + + # Place a workload into the linkcheck queue + link_count = 10 + rqueue, wqueue = Queue(), Queue() + for _ in range(link_count): + wqueue.put(CheckRequest(0, Hyperlink(f"http://{address}", "test", "test.rst", 1))) + + begin, checked = time.time(), [] + threads = [ + HyperlinkAvailabilityCheckWorker( + config=app.config, + rqueue=rqueue, + wqueue=wqueue, + rate_limits={}, + ) + for _ in range(10) + ] + for thread in threads: + thread.start() + while time.time() < begin + 5 and len(checked) < link_count: + checked.append(rqueue.get(timeout=5)) + for thread in threads: + thread.join(timeout=0) + + # Ensure that all items were consumed within the time limit + _, stderr = capsys.readouterr() + assert len(checked) == link_count + assert "TimeoutError" not in stderr + + +class ConnectionResetHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def do_HEAD(self): + self.close_connection = True + + def do_GET(self): + self.send_response(200, "OK") + self.send_header("Content-Length", "0") + self.end_headers() + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-localserver', freshenv=True) +def test_get_after_head_raises_connection_error(app): + with serve_application(app, ConnectionResetHandler) as address: + app.build() + content = (app.outdir / 'output.txt').read_text(encoding='utf8') + assert not content + content = (app.outdir / 'output.json').read_text(encoding='utf8') + assert json.loads(content) == { + "filename": "index.rst", + "lineno": 1, + "status": "working", + "code": 0, + "uri": f'http://{address}/', + "info": "", + } + + +@pytest.mark.sphinx('linkcheck', testroot='linkcheck-documents_exclude', freshenv=True) +def test_linkcheck_exclude_documents(app): + with serve_application(app, DefaultsHandler): + app.build() + + with open(app.outdir / 'output.json', encoding='utf-8') as fp: + content = [json.loads(record) for record in fp] + + assert len(content) == 2 + assert { + 'filename': 'broken_link.rst', + 'lineno': 4, + 'status': 'ignored', + 'code': 0, + 'uri': 'https://www.sphinx-doc.org/this-is-a-broken-link', + 'info': 'broken_link matched ^broken_link$ from linkcheck_exclude_documents', + } in content + assert { + 'filename': 'br0ken_link.rst', + 'lineno': 4, + 'status': 'ignored', + 'code': 0, + 'uri': 'https://www.sphinx-doc.org/this-is-another-broken-link', + 'info': 'br0ken_link matched br[0-9]ken_link from linkcheck_exclude_documents', + } in content |