summaryrefslogtreecommitdiffstats
path: root/debian/tests/arch-all-mitm.py
diff options
context:
space:
mode:
Diffstat (limited to 'debian/tests/arch-all-mitm.py')
-rw-r--r--debian/tests/arch-all-mitm.py120
1 files changed, 120 insertions, 0 deletions
diff --git a/debian/tests/arch-all-mitm.py b/debian/tests/arch-all-mitm.py
new file mode 100644
index 0000000..9180cd9
--- /dev/null
+++ b/debian/tests/arch-all-mitm.py
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2023 Debian Install System Team <debian-boot@lists.debian.org>
+# Copyright (C) 2023 Matthias Klumpp <matthias@tenstral.net>
+#
+# SPDX-License-Identifier: MIT
+
+"""
+Flask app which MITM's a legacy-compatibility archive to make it arch:all-only.
+"""
+import functools
+import gzip
+import hashlib
+import os
+
+import requests
+import tempfile
+from apt_pkg import TagFile, TagSection
+from flask import Flask, redirect
+
+app = Flask(__name__)
+
+ARCH = os.environ.get("FLASK_ARCH", "amd64")
+DIST = os.environ.get("FLASK_DIST", "trixie")
+DISTRO = os.environ.get("FLASK_DISTRO", "debian")
+MIRROR = os.environ.get("FLASK_MIRROR", "http://deb.debian.org")
+
+
+if DISTRO in ("debian", "pureos"):
+ hash_funcs = [hashlib.md5, hashlib.sha256]
+else:
+ # Ubuntu includes SHA1 still
+ hash_funcs = [hashlib.md5, hashlib.sha1, hashlib.sha256]
+
+
+def _munge_release_file(url: str) -> bytes:
+ """Given a Release file URL, rewrite it for our modified Packages content."""
+ original = requests.get(MIRROR + "/" + url).content.decode('utf-8')
+ packages_content = _packages_arch_content(
+ f"{DISTRO}/dists/{DIST}/main/binary-{ARCH}/Packages"
+ )
+ size = str(len(packages_content))
+ sums = [
+ hash_func(packages_content).hexdigest()
+ for hash_func in hash_funcs
+ ]
+ new_lines = []
+ filename = f"main/binary-{ARCH}/Packages"
+ for line in original.splitlines():
+ if line.startswith('No-Support-for-Architecture-all:'):
+ continue
+ if line.startswith('Architectures:'):
+ if ' all' not in line:
+ line += ' all'
+ new_lines.append(line)
+ continue
+ if line.startswith('Acquire-By-Hash:'):
+ new_lines.append('Acquire-By-Hash: no')
+ continue
+ if not line.endswith(filename):
+ new_lines.append(line)
+ continue
+ new_lines.append(" ".join(["", sums.pop(0), size, filename]))
+
+ result = "\n".join(new_lines)
+ return result.encode('utf-8')
+
+
+@functools.lru_cache
+def _packages_arch_content(url: str) -> bytes:
+ """Given an arch-specific Packages URL, fetch it and filter out arch:all."""
+ resp = requests.get(MIRROR + "/" + url + ".gz")
+ upstream_content = gzip.decompress(resp.content)
+
+ filtered_sections = []
+ with tempfile.NamedTemporaryFile() as tmp_f:
+ tmp_f.write(upstream_content)
+ tmp_f.flush()
+
+ with TagFile(tmp_f.name) as tf:
+ for section in tf:
+ if section.get('Architecture') == 'all':
+ continue
+ filtered_sections.append(section)
+
+ result = '\n'.join([str(s) for s in filtered_sections])
+ return result.encode('utf-8')
+
+
+@functools.lru_cache
+def _packages_indep_content(url: str) -> bytes:
+ """Given an arch:all Packages URL, just return its uncompressed contents."""
+ resp = requests.get(MIRROR + "/" + url + ".gz")
+ upstream_content = gzip.decompress(resp.content)
+
+ return upstream_content
+
+
+@app.route("/<path:url>", methods=["GET", "POST"])
+def root(url):
+ """Handler for all requests."""
+ if (
+ url == f"{DISTRO}/dists/{DIST}/InRelease"
+ or "by-hash" in url
+ or "Packages.xz" in url
+ or "Packages.gz" in url
+ ):
+ # 404 these URLs to force clients to fetch by path and without compression, to
+ # make MITM easier
+ return "", 404
+ if url == f"{DISTRO}/dists/{DIST}/Release":
+ # If Release is being fetched, return our modified version
+ return _munge_release_file(url)
+ if url == f"{DISTRO}/dists/{DIST}/main/binary-all/Packages":
+ return _packages_indep_content(url)
+ if url == f"{DISTRO}/dists/{DIST}/main/binary-{ARCH}/Packages":
+ # If Packages is being fetched, return our modified version
+ return _packages_arch_content(url)
+ # For anything we don't need to modify, redirect clients to upstream mirror
+ return redirect(f"{MIRROR}/{url}")