summaryrefslogtreecommitdiffstats
path: root/src/debputy/plugin/debputy/package_processors.py
blob: 374775546f96695c364fe1f886a24f9cf9c71e8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import contextlib
import functools
import gzip
import os
import re
import subprocess
from contextlib import ExitStack
from typing import Optional, Iterator, IO, Any, List, Dict, Callable, Union

from debputy.plugin.api import VirtualPath
from debputy.util import _error, xargs, escape_shell, _info, assume_not_none


@contextlib.contextmanager
def _open_maybe_gzip(path: VirtualPath) -> Iterator[Union[IO[bytes], gzip.GzipFile]]:
    if path.name.endswith(".gz"):
        with gzip.GzipFile(path.fs_path, "rb") as fd:
            yield fd
    else:
        with path.open(byte_io=True) as fd:
            yield fd


_SO_LINK_RE = re.compile(rb"[.]so\s+(.*)\s*")
_LA_DEP_LIB_RE = re.compile(rb"'.+'")


def _detect_so_link(path: VirtualPath) -> Optional[str]:
    so_link_re = _SO_LINK_RE
    with _open_maybe_gzip(path) as fd:
        for line in fd:
            m = so_link_re.search(line)
            if m:
                return m.group(1).decode("utf-8")
    return None


def _replace_with_symlink(path: VirtualPath, so_link_target: str) -> None:
    adjusted_target = so_link_target
    parent_dir = path.parent_dir
    assert parent_dir is not None  # For the type checking
    if parent_dir.name == os.path.dirname(adjusted_target):
        # Avoid man8/../man8/foo links
        adjusted_target = os.path.basename(adjusted_target)
    elif "/" in so_link_target:
        # symlinks and so links have a different base directory when the link has a "/".
        # Adjust with an extra "../" to align the result
        adjusted_target = "../" + adjusted_target

    path.unlink()
    parent_dir.add_symlink(path.name, adjusted_target)


@functools.lru_cache(1)
def _has_man_recode() -> bool:
    # Ideally, we would just use shutil.which or something like that.
    # Unfortunately, in debhelper, we experienced problems with which
    # returning "yes" for a man tool that actually could not be run
    # on salsa CI.
    #
    # Therefore, we adopt the logic of dh_installman to run the tool
    # with --help to confirm it is not broken, because no one could
    # figure out what happened in the salsa CI and my life is still
    # too short to figure it out.
    try:
        subprocess.check_call(
            ["man-recode", "--help"],
            stdin=subprocess.DEVNULL,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            restore_signals=True,
        )
    except subprocess.CalledProcessError:
        return False
    return True


def process_manpages(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
    man_dir = fs_root.lookup("./usr/share/man")
    if not man_dir:
        return

    re_encode = []
    for path in (p for p in man_dir.all_paths() if p.is_file and p.has_fs_path):
        size = path.size
        if size == 0:
            continue
        so_link_target = None
        if size <= 1024:
            # debhelper has a 1024 byte guard on the basis that ".so file tend to be small".
            # That guard worked well for debhelper, so lets keep it for now on that basis alone.
            so_link_target = _detect_so_link(path)
        if so_link_target:
            _replace_with_symlink(path, so_link_target)
        else:
            re_encode.append(path)

    if not re_encode or not _has_man_recode():
        return

    with ExitStack() as manager:
        manpages = [
            manager.enter_context(p.replace_fs_path_content()) for p in re_encode
        ]
        static_cmd = ["man-recode", "--to-code", "UTF-8", "--suffix", ".encoded"]
        for cmd in xargs(static_cmd, manpages):
            _info(f"Ensuring manpages have utf-8 encoding via: {escape_shell(*cmd)}")
            try:
                subprocess.check_call(
                    cmd,
                    stdin=subprocess.DEVNULL,
                    restore_signals=True,
                )
            except subprocess.CalledProcessError:
                _error(
                    "The man-recode process failed. Please review the output of `man-recode` to understand"
                    " what went wrong."
                )
        for manpage in manpages:
            os.rename(f"{manpage}.encoded", manpage)


def _filter_compress_paths() -> Callable[[VirtualPath], Iterator[VirtualPath]]:
    ignore_dir_basenames = {
        "_sources",
    }
    ignore_basenames = {
        ".htaccess",
        "index.sgml",
        "objects.inv",
        "search_index.json",
        "copyright",
    }
    ignore_extensions = {
        ".htm",
        ".html",
        ".xhtml",
        ".gif",
        ".png",
        ".jpg",
        ".jpeg",
        ".gz",
        ".taz",
        ".tgz",
        ".z",
        ".bz2",
        ".epub",
        ".jar",
        ".zip",
        ".odg",
        ".odp",
        ".odt",
        ".css",
        ".xz",
        ".lz",
        ".lzma",
        ".haddock",
        ".hs",
        ".woff",
        ".woff2",
        ".svg",
        ".svgz",
        ".js",
        ".devhelp2",
        ".map",  # Technically, dh_compress has this one case-sensitive
    }
    ignore_special_cases = ("-gz", "-z", "_z")

    def _filtered_walk(path: VirtualPath) -> Iterator[VirtualPath]:
        for path, children in path.walk():
            if path.name in ignore_dir_basenames:
                children.clear()
                continue
            if path.is_dir and path.name == "examples":
                # Ignore anything beneath /usr/share/doc/*/examples
                parent = path.parent_dir
                grand_parent = parent.parent_dir if parent else None
                if grand_parent and grand_parent.absolute == "/usr/share/doc":
                    children.clear()
                    continue
            name = path.name
            if (
                path.is_symlink
                or not path.is_file
                or name in ignore_basenames
                or not path.has_fs_path
            ):
                continue

            name_lc = name.lower()
            _, ext = os.path.splitext(name_lc)

            if ext in ignore_extensions or name_lc.endswith(ignore_special_cases):
                continue
            yield path

    return _filtered_walk


def _find_compressable_paths(fs_root: VirtualPath) -> Iterator[VirtualPath]:
    path_filter = _filter_compress_paths()

    for p, compress_size_threshold in (
        ("./usr/share/info", 0),
        ("./usr/share/man", 0),
        ("./usr/share/doc", 4096),
    ):
        path = fs_root.lookup(p)
        if path is None:
            continue
        paths = path_filter(path)
        if compress_size_threshold:
            # The special-case for changelog and NEWS is from dh_compress. Generally these files
            # have always been compressed regardless of their size.
            paths = (
                p
                for p in paths
                if p.size > compress_size_threshold
                or p.name.startswith(("changelog", "NEWS"))
            )
        yield from paths
    x11_path = fs_root.lookup("./usr/share/fonts/X11")
    if x11_path:
        yield from (
            p for p in x11_path.all_paths() if p.is_file and p.name.endswith(".pcf")
        )


def apply_compression(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
    # TODO: Support hardlinks
    compressed_files: Dict[str, str] = {}
    for path in _find_compressable_paths(fs_root):
        parent_dir = assume_not_none(path.parent_dir)
        with parent_dir.add_file(f"{path.name}.gz", mtime=path.mtime) as new_file, open(
            new_file.fs_path, "wb"
        ) as fd:
            try:
                subprocess.check_call(["gzip", "-9nc", path.fs_path], stdout=fd)
            except subprocess.CalledProcessError:
                full_command = f"gzip -9nc {escape_shell(path.fs_path)} > {escape_shell(new_file.fs_path)}"
                _error(
                    f"The compression of {path.path} failed. Please review the error message from gzip to"
                    f" understand what went wrong.  Full command was: {full_command}"
                )
            compressed_files[path.path] = new_file.path
        del parent_dir[path.name]

    all_remaining_symlinks = {p.path: p for p in fs_root.all_paths() if p.is_symlink}
    changed = True
    while changed:
        changed = False
        remaining: List[VirtualPath] = list(all_remaining_symlinks.values())
        for symlink in remaining:
            target = symlink.readlink()
            dir_target, basename_target = os.path.split(target)
            new_basename_target = f"{basename_target}.gz"
            symlink_parent_dir = assume_not_none(symlink.parent_dir)
            dir_path = symlink_parent_dir
            if dir_target != "":
                dir_path = dir_path.lookup(dir_target)
            if (
                not dir_path
                or basename_target in dir_path
                or new_basename_target not in dir_path
            ):
                continue
            del all_remaining_symlinks[symlink.path]
            changed = True

            new_link_name = (
                f"{symlink.name}.gz"
                if not symlink.name.endswith(".gz")
                else symlink.name
            )
            symlink_parent_dir.add_symlink(
                new_link_name, os.path.join(dir_target, new_basename_target)
            )
            symlink.unlink()


def _la_files(fs_root: VirtualPath) -> Iterator[VirtualPath]:
    lib_dir = fs_root.lookup("/usr/lib")
    if not lib_dir:
        return
    # Original code only iterators directly in /usr/lib. To be a faithful conversion, we do the same
    # here.
    # Eagerly resolve the list as the replacement can trigger a runtime error otherwise
    paths = list(lib_dir.iterdir)
    yield from (p for p in paths if p.is_file and p.name.endswith(".la"))


# Conceptually, the same feature that dh_gnome provides.
# The clean_la_files function based on the dh_gnome version written by Luca Falavigna in 2010,
# who in turn references a Makefile version of the feature.
# https://salsa.debian.org/gnome-team/gnome-pkg-tools/-/commit/2868e1e41ea45443b0fb340bf4c71c4de87d4a5b
def clean_la_files(
    fs_root: VirtualPath,
    _unused1: Any,
    _unused2: Any,
) -> None:
    for path in _la_files(fs_root):
        buffer = []
        with path.open(byte_io=True) as fd:
            replace_file = False
            for line in fd:
                if line.startswith(b"dependency_libs"):
                    replacement = _LA_DEP_LIB_RE.sub(b"''", line)
                    if replacement != line:
                        replace_file = True
                        line = replacement
                buffer.append(line)

            if not replace_file:
                continue
            _info(f"Clearing the dependency_libs line in {path.path}")
            with path.replace_fs_path_content() as fs_path, open(fs_path, "wb") as wfd:
                wfd.writelines(buffer)