summaryrefslogtreecommitdiffstats
path: root/debian/tests/arch-all-mitm.py
blob: 9180cd9c0bfc0fccc67768edc056ec9a3103bc90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023 Debian Install System Team <debian-boot@lists.debian.org>
# Copyright (C) 2023 Matthias Klumpp <matthias@tenstral.net>
#
# SPDX-License-Identifier: MIT

"""
Flask app which MITM's a legacy-compatibility archive to make it arch:all-only.
"""
import functools
import gzip
import hashlib
import os

import requests
import tempfile
from apt_pkg import TagFile, TagSection
from flask import Flask, redirect

app = Flask(__name__)

ARCH = os.environ.get("FLASK_ARCH", "amd64")
DIST = os.environ.get("FLASK_DIST", "trixie")
DISTRO = os.environ.get("FLASK_DISTRO", "debian")
MIRROR = os.environ.get("FLASK_MIRROR", "http://deb.debian.org")


if DISTRO in ("debian", "pureos"):
    hash_funcs = [hashlib.md5, hashlib.sha256]
else:
    # Ubuntu includes SHA1 still
    hash_funcs = [hashlib.md5, hashlib.sha1, hashlib.sha256]


def _munge_release_file(url: str) -> bytes:
    """Given a Release file URL, rewrite it for our modified Packages content."""
    original = requests.get(MIRROR + "/" + url).content.decode('utf-8')
    packages_content = _packages_arch_content(
        f"{DISTRO}/dists/{DIST}/main/binary-{ARCH}/Packages"
    )
    size = str(len(packages_content))
    sums = [
        hash_func(packages_content).hexdigest()
        for hash_func in hash_funcs
    ]
    new_lines = []
    filename = f"main/binary-{ARCH}/Packages"
    for line in original.splitlines():
        if line.startswith('No-Support-for-Architecture-all:'):
            continue
        if line.startswith('Architectures:'):
            if ' all' not in line:
                line += ' all'
            new_lines.append(line)
            continue
        if line.startswith('Acquire-By-Hash:'):
            new_lines.append('Acquire-By-Hash: no')
            continue
        if not line.endswith(filename):
            new_lines.append(line)
            continue
        new_lines.append(" ".join(["", sums.pop(0), size, filename]))

    result = "\n".join(new_lines)
    return result.encode('utf-8')


@functools.lru_cache
def _packages_arch_content(url: str) -> bytes:
    """Given an arch-specific Packages URL, fetch it and filter out arch:all."""
    resp = requests.get(MIRROR + "/" + url + ".gz")
    upstream_content = gzip.decompress(resp.content)

    filtered_sections = []
    with tempfile.NamedTemporaryFile() as tmp_f:
        tmp_f.write(upstream_content)
        tmp_f.flush()

        with TagFile(tmp_f.name) as tf:
            for section in tf:
                if section.get('Architecture') == 'all':
                    continue
                filtered_sections.append(section)

    result = '\n'.join([str(s) for s in filtered_sections])
    return result.encode('utf-8')


@functools.lru_cache
def _packages_indep_content(url: str) -> bytes:
    """Given an arch:all Packages URL, just return its uncompressed contents."""
    resp = requests.get(MIRROR + "/" + url + ".gz")
    upstream_content = gzip.decompress(resp.content)

    return upstream_content


@app.route("/<path:url>", methods=["GET", "POST"])
def root(url):
    """Handler for all requests."""
    if (
        url == f"{DISTRO}/dists/{DIST}/InRelease"
        or "by-hash" in url
        or "Packages.xz" in url
        or "Packages.gz" in url
    ):
        # 404 these URLs to force clients to fetch by path and without compression, to
        # make MITM easier
        return "", 404
    if url == f"{DISTRO}/dists/{DIST}/Release":
        # If Release is being fetched, return our modified version
        return _munge_release_file(url)
    if url == f"{DISTRO}/dists/{DIST}/main/binary-all/Packages":
        return _packages_indep_content(url)
    if url == f"{DISTRO}/dists/{DIST}/main/binary-{ARCH}/Packages":
        # If Packages is being fetched, return our modified version
        return _packages_arch_content(url)
    # For anything we don't need to modify, redirect clients to upstream mirror
    return redirect(f"{MIRROR}/{url}")