summaryrefslogtreecommitdiffstats
path: root/src/debputy/intermediate_manifest.py
blob: 7d8dd632630e94f438ae811ab0208a0b27795db6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import dataclasses
import json
import os
import stat
import sys
import tarfile
from enum import Enum


from typing import Optional, List, Dict, Any, Iterable, Union, Self, Mapping, IO

IntermediateManifest = List["TarMember"]


class PathType(Enum):
    FILE = ("file", tarfile.REGTYPE)
    DIRECTORY = ("directory", tarfile.DIRTYPE)
    SYMLINK = ("symlink", tarfile.SYMTYPE)
    # TODO: Add hardlink, FIFO, Char device, BLK device, etc.

    @property
    def manifest_key(self) -> str:
        return self.value[0]

    @property
    def tarinfo_type(self) -> bytes:
        return self.value[1]

    @property
    def can_be_virtual(self) -> bool:
        return self in (PathType.DIRECTORY, PathType.SYMLINK)


KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType}


def _dirname(path: str) -> str:
    path = path.rstrip("/")
    if path == ".":
        return path
    return os.path.dirname(path)


def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType:
    if stat.S_ISREG(st_mode):
        path_type = PathType.FILE
    elif stat.S_ISDIR(st_mode):
        path_type = PathType.DIRECTORY
    #        elif stat.S_ISFIFO(st_result):
    #            type = FIFOTYPE
    elif stat.S_ISLNK(st_mode):
        raise ValueError(
            "Symlinks should have been rewritten to use the virtual rule."
            " Otherwise, the link would not be normalized according to Debian Policy."
        )
    #        elif stat.S_ISCHR(st_result):
    #            type = CHRTYPE
    #        elif stat.S_ISBLK(st_result):
    #            type = BLKTYPE
    else:
        raise ValueError(
            f"The path {fs_path} had an unsupported/unknown file type."
            f" Probably a bug in the tool"
        )
    return path_type


@dataclasses.dataclass(slots=True)
class TarMember:
    member_path: str
    path_type: PathType
    fs_path: Optional[str]
    mode: int
    owner: str
    uid: int
    group: str
    gid: int
    mtime: float
    link_target: str = ""
    is_virtual_entry: bool = False
    may_steal_fs_path: bool = False

    def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo:
        tar_info: tarfile.TarInfo
        if self.is_virtual_entry:
            assert self.path_type.can_be_virtual
            tar_info = tar_fd.tarinfo(self.member_path)
            tar_info.size = 0
            tar_info.type = self.path_type.tarinfo_type
            tar_info.linkpath = self.link_target
        else:
            try:
                tar_info = tar_fd.gettarinfo(
                    name=self.fs_path, arcname=self.member_path
                )
            except (TypeError, ValueError) as e:
                raise ValueError(
                    f"Unable to prepare tar info for {self.member_path}"
                ) from e
            # TODO: Eventually, we should be able to unconditionally rely on link_target.  However,
            # until we got symlinks and hardlinks correctly done in the JSON generator, it will be
            # conditional for now.
            if self.link_target != "":
                tar_info.linkpath = self.link_target
        tar_info.mode = self.mode
        tar_info.uname = self.owner
        tar_info.uid = self.uid
        tar_info.gname = self.group
        tar_info.gid = self.gid
        tar_info.mode = self.mode
        tar_info.mtime = int(self.mtime)

        return tar_info

    @classmethod
    def from_file(
        cls,
        member_path: str,
        fs_path: str,
        mode: Optional[int] = None,
        owner: str = "root",
        uid: int = 0,
        group: str = "root",
        gid: int = 0,
        path_mtime: Optional[Union[float, int]] = None,
        clamp_mtime_to: Optional[int] = None,
        path_type: Optional[PathType] = None,
        may_steal_fs_path: bool = False,
    ) -> "TarMember":
        # Avoid lstat'ing if we can as it makes it easier to do tests of the code
        # (as we do not need an existing physical fs path)
        if path_type is None or path_mtime is None or mode is None:
            st_result = os.lstat(fs_path)
            st_mode = st_result.st_mode
            if mode is None:
                mode = st_mode
            if path_mtime is None:
                path_mtime = st_result.st_mtime
            if path_type is None:
                path_type = _fs_type_from_st_mode(fs_path, st_mode)

        if clamp_mtime_to is not None and path_mtime > clamp_mtime_to:
            path_mtime = clamp_mtime_to

        if may_steal_fs_path:
            assert (
                "debputy/scratch-dir/" in fs_path
            ), f"{fs_path} should not have been stealable"

        return cls(
            member_path=member_path,
            path_type=path_type,
            fs_path=fs_path,
            mode=mode,
            owner=owner,
            uid=uid,
            group=group,
            gid=gid,
            mtime=float(path_mtime),
            is_virtual_entry=False,
            may_steal_fs_path=may_steal_fs_path,
        )

    @classmethod
    def virtual_path(
        cls,
        member_path: str,
        path_type: PathType,
        mtime: float,
        mode: int,
        link_target: str = "",
        owner: str = "root",
        uid: int = 0,
        group: str = "root",
        gid: int = 0,
    ) -> Self:
        if not path_type.can_be_virtual:
            raise ValueError(f"The path type {path_type.name} cannot be virtual")
        if (path_type == PathType.SYMLINK) ^ bool(link_target):
            if not link_target:
                raise ValueError("Symlinks must have a link target")
            # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this
            # code then!
            raise ValueError("Non-symlinks must not have a link target")
        return cls(
            member_path=member_path,
            path_type=path_type,
            fs_path=None,
            link_target=link_target,
            mode=mode,
            owner=owner,
            uid=uid,
            group=group,
            gid=gid,
            mtime=mtime,
            is_virtual_entry=True,
        )

    def clone_and_replace(self, /, **changes: Any) -> "TarMember":
        return dataclasses.replace(self, **changes)

    def to_manifest(self) -> Dict[str, Any]:
        d = dataclasses.asdict(self)
        try:
            d["mode"] = oct(self.mode)
        except (TypeError, ValueError) as e:
            raise TypeError(f"Bad mode in TarMember {self.member_path}") from e
        d["path_type"] = self.path_type.manifest_key
        # "compress" the output by removing redundant fields
        if self.link_target is None or self.link_target == "":
            del d["link_target"]
        if self.is_virtual_entry:
            assert self.fs_path is None
            del d["fs_path"]
        else:
            del d["is_virtual_entry"]
        return d

    @classmethod
    def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest:
        directories = {"."}
        if manifest_path == "-":
            with sys.stdin as fd:
                data = json.load(fd)
                contents = [TarMember.from_dict(m) for m in data]
        else:
            with open(manifest_path) as fd:
                data = json.load(fd)
                contents = [TarMember.from_dict(m) for m in data]
        if not contents:
            raise ValueError(
                "Empty manifest (note that the root directory should always be present"
            )
        if contents[0].member_path != "./":
            raise ValueError('The first member must always be the root directory "./"')
        for tar_member in contents:
            directory = _dirname(tar_member.member_path)
            if directory not in directories:
                raise ValueError(
                    f'The path "{tar_member.member_path}" came before the directory it is in (or the path'
                    f" is not a directory). Either way leads to a broken deb."
                )
            if tar_member.path_type == PathType.DIRECTORY:
                directories.add(tar_member.member_path.rstrip("/"))
        return contents

    @classmethod
    def from_dict(cls, d: Any) -> "TarMember":
        member_path = d["member_path"]
        raw_mode = d["mode"]
        if not raw_mode.startswith("0o"):
            raise ValueError(f"Bad mode for {member_path}")
        is_virtual_entry = d.get("is_virtual_entry") or False
        path_type = KEY2PATH_TYPE[d["path_type"]]
        fs_path = d.get("fs_path")
        mode = int(raw_mode[2:], 8)
        if is_virtual_entry:
            if not path_type.can_be_virtual:
                raise ValueError(
                    f"Bad file type or is_virtual_entry for {d['member_path']}."
                    " The file type cannot be virtual"
                )
            if fs_path is not None:
                raise ValueError(
                    f'Invalid declaration for "{member_path}".'
                    " The path is listed as a virtual entry but has a file system path"
                )
        elif fs_path is None:
            raise ValueError(
                f'Invalid declaration for "{member_path}".'
                " The path is neither a virtual path nor does it have a file system path!"
            )
        if path_type == PathType.DIRECTORY and not member_path.endswith("/"):
            raise ValueError(
                f'Invalid declaration for "{member_path}".'
                " The path is listed as a directory but does not end with a slash"
            )

        link_target = d.get("link_target")
        if path_type == PathType.SYMLINK:
            if mode != 0o777:
                raise ValueError(
                    f'Invalid declaration for "{member_path}".'
                    f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}."
                )
            if not link_target:
                raise ValueError(
                    f'Invalid declaration for "{member_path}".'
                    " Symlinks must have a link_target"
                )
        elif link_target is not None and link_target != "":
            # TODO: Eventually hardlinks should have them too.  But that is a problem for a future programmer
            raise ValueError(
                f'Invalid declaration for "{member_path}".'
                " Only symlinks can have a link_target"
            )
        else:
            link_target = ""
        may_steal_fs_path = d.get("may_steal_fs_path") or False

        if may_steal_fs_path:
            assert (
                "debputy/scratch-dir/" in fs_path
            ), f"{fs_path} should not have been stealable"
        return cls(
            member_path=member_path,
            path_type=path_type,
            fs_path=fs_path,
            mode=mode,
            owner=d["owner"],
            uid=d["uid"],
            group=d["group"],
            gid=d["gid"],
            mtime=float(d["mtime"]),
            link_target=link_target,
            is_virtual_entry=is_virtual_entry,
            may_steal_fs_path=may_steal_fs_path,
        )


def output_intermediate_manifest(
    manifest_output_file: str,
    members: Iterable[TarMember],
) -> None:
    with open(manifest_output_file, "w") as fd:
        output_intermediate_manifest_to_fd(fd, members)


def output_intermediate_manifest_to_fd(
    fd: IO[str], members: Iterable[TarMember]
) -> None:
    serial_format = [m.to_manifest() for m in members]
    json.dump(serial_format, fd)