Coverage for src/debputy/intermediate_manifest.py: 62%
172 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import dataclasses
2import json
3import os
4import stat
5import sys
6import tarfile
7from enum import Enum
10from typing import Optional, List, Dict, Any, Iterable, Union, Self, Mapping, IO
12IntermediateManifest = List["TarMember"]
15class PathType(Enum):
16 FILE = ("file", tarfile.REGTYPE)
17 DIRECTORY = ("directory", tarfile.DIRTYPE)
18 SYMLINK = ("symlink", tarfile.SYMTYPE)
19 # TODO: Add hardlink, FIFO, Char device, BLK device, etc.
21 @property
22 def manifest_key(self) -> str:
23 return self.value[0]
25 @property
26 def tarinfo_type(self) -> bytes:
27 return self.value[1]
29 @property
30 def can_be_virtual(self) -> bool:
31 return self in (PathType.DIRECTORY, PathType.SYMLINK)
34KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType}
37def _dirname(path: str) -> str:
38 path = path.rstrip("/")
39 if path == ".": 39 ↛ 41line 39 didn't jump to line 41, because the condition on line 39 was never false
40 return path
41 return os.path.dirname(path)
44def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType:
45 if stat.S_ISREG(st_mode):
46 path_type = PathType.FILE
47 elif stat.S_ISDIR(st_mode):
48 path_type = PathType.DIRECTORY
49 # elif stat.S_ISFIFO(st_result):
50 # type = FIFOTYPE
51 elif stat.S_ISLNK(st_mode):
52 raise ValueError(
53 "Symlinks should have been rewritten to use the virtual rule."
54 " Otherwise, the link would not be normalized according to Debian Policy."
55 )
56 # elif stat.S_ISCHR(st_result):
57 # type = CHRTYPE
58 # elif stat.S_ISBLK(st_result):
59 # type = BLKTYPE
60 else:
61 raise ValueError(
62 f"The path {fs_path} had an unsupported/unknown file type."
63 f" Probably a bug in the tool"
64 )
65 return path_type
68@dataclasses.dataclass(slots=True)
69class TarMember:
70 member_path: str
71 path_type: PathType
72 fs_path: Optional[str]
73 mode: int
74 owner: str
75 uid: int
76 group: str
77 gid: int
78 mtime: float
79 link_target: str = ""
80 is_virtual_entry: bool = False
81 may_steal_fs_path: bool = False
83 def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo:
84 tar_info: tarfile.TarInfo
85 if self.is_virtual_entry:
86 assert self.path_type.can_be_virtual
87 tar_info = tar_fd.tarinfo(self.member_path)
88 tar_info.size = 0
89 tar_info.type = self.path_type.tarinfo_type
90 tar_info.linkpath = self.link_target
91 else:
92 try:
93 tar_info = tar_fd.gettarinfo(
94 name=self.fs_path, arcname=self.member_path
95 )
96 except (TypeError, ValueError) as e:
97 raise ValueError(
98 f"Unable to prepare tar info for {self.member_path}"
99 ) from e
100 # TODO: Eventually, we should be able to unconditionally rely on link_target. However,
101 # until we got symlinks and hardlinks correctly done in the JSON generator, it will be
102 # conditional for now.
103 if self.link_target != "": 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true
104 tar_info.linkpath = self.link_target
105 tar_info.mode = self.mode
106 tar_info.uname = self.owner
107 tar_info.uid = self.uid
108 tar_info.gname = self.group
109 tar_info.gid = self.gid
110 tar_info.mode = self.mode
111 tar_info.mtime = int(self.mtime)
113 return tar_info
115 @classmethod
116 def from_file(
117 cls,
118 member_path: str,
119 fs_path: str,
120 mode: Optional[int] = None,
121 owner: str = "root",
122 uid: int = 0,
123 group: str = "root",
124 gid: int = 0,
125 path_mtime: Optional[Union[float, int]] = None,
126 clamp_mtime_to: Optional[int] = None,
127 path_type: Optional[PathType] = None,
128 may_steal_fs_path: bool = False,
129 ) -> "TarMember":
130 # Avoid lstat'ing if we can as it makes it easier to do tests of the code
131 # (as we do not need an existing physical fs path)
132 if path_type is None or path_mtime is None or mode is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true
133 st_result = os.lstat(fs_path)
134 st_mode = st_result.st_mode
135 if mode is None:
136 mode = st_mode
137 if path_mtime is None:
138 path_mtime = st_result.st_mtime
139 if path_type is None:
140 path_type = _fs_type_from_st_mode(fs_path, st_mode)
142 if clamp_mtime_to is not None and path_mtime > clamp_mtime_to: 142 ↛ 143line 142 didn't jump to line 143, because the condition on line 142 was never true
143 path_mtime = clamp_mtime_to
145 if may_steal_fs_path: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true
146 assert (
147 "debputy/scratch-dir/" in fs_path
148 ), f"{fs_path} should not have been stealable"
150 return cls(
151 member_path=member_path,
152 path_type=path_type,
153 fs_path=fs_path,
154 mode=mode,
155 owner=owner,
156 uid=uid,
157 group=group,
158 gid=gid,
159 mtime=float(path_mtime),
160 is_virtual_entry=False,
161 may_steal_fs_path=may_steal_fs_path,
162 )
164 @classmethod
165 def virtual_path(
166 cls,
167 member_path: str,
168 path_type: PathType,
169 mtime: float,
170 mode: int,
171 link_target: str = "",
172 owner: str = "root",
173 uid: int = 0,
174 group: str = "root",
175 gid: int = 0,
176 ) -> Self:
177 if not path_type.can_be_virtual: 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true
178 raise ValueError(f"The path type {path_type.name} cannot be virtual")
179 if (path_type == PathType.SYMLINK) ^ bool(link_target): 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true
180 if not link_target:
181 raise ValueError("Symlinks must have a link target")
182 # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this
183 # code then!
184 raise ValueError("Non-symlinks must not have a link target")
185 return cls(
186 member_path=member_path,
187 path_type=path_type,
188 fs_path=None,
189 link_target=link_target,
190 mode=mode,
191 owner=owner,
192 uid=uid,
193 group=group,
194 gid=gid,
195 mtime=mtime,
196 is_virtual_entry=True,
197 )
199 def clone_and_replace(self, /, **changes: Any) -> "TarMember":
200 return dataclasses.replace(self, **changes)
202 def to_manifest(self) -> Dict[str, Any]:
203 d = dataclasses.asdict(self)
204 try:
205 d["mode"] = oct(self.mode)
206 except (TypeError, ValueError) as e:
207 raise TypeError(f"Bad mode in TarMember {self.member_path}") from e
208 d["path_type"] = self.path_type.manifest_key
209 # "compress" the output by removing redundant fields
210 if self.link_target is None or self.link_target == "": 210 ↛ 212line 210 didn't jump to line 212, because the condition on line 210 was never false
211 del d["link_target"]
212 if self.is_virtual_entry: 212 ↛ 216line 212 didn't jump to line 216, because the condition on line 212 was never false
213 assert self.fs_path is None
214 del d["fs_path"]
215 else:
216 del d["is_virtual_entry"]
217 return d
219 @classmethod
220 def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest:
221 directories = {"."}
222 if manifest_path == "-": 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 with sys.stdin as fd:
224 data = json.load(fd)
225 contents = [TarMember.from_dict(m) for m in data]
226 else:
227 with open(manifest_path) as fd:
228 data = json.load(fd)
229 contents = [TarMember.from_dict(m) for m in data]
230 if not contents: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true
231 raise ValueError(
232 "Empty manifest (note that the root directory should always be present"
233 )
234 if contents[0].member_path != "./": 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true
235 raise ValueError('The first member must always be the root directory "./"')
236 for tar_member in contents:
237 directory = _dirname(tar_member.member_path)
238 if directory not in directories: 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true
239 raise ValueError(
240 f'The path "{tar_member.member_path}" came before the directory it is in (or the path'
241 f" is not a directory). Either way leads to a broken deb."
242 )
243 if tar_member.path_type == PathType.DIRECTORY: 243 ↛ 236line 243 didn't jump to line 236, because the condition on line 243 was never false
244 directories.add(tar_member.member_path.rstrip("/"))
245 return contents
247 @classmethod
248 def from_dict(cls, d: Any) -> "TarMember":
249 member_path = d["member_path"]
250 raw_mode = d["mode"]
251 if not raw_mode.startswith("0o"): 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true
252 raise ValueError(f"Bad mode for {member_path}")
253 is_virtual_entry = d.get("is_virtual_entry") or False
254 path_type = KEY2PATH_TYPE[d["path_type"]]
255 fs_path = d.get("fs_path")
256 mode = int(raw_mode[2:], 8)
257 if is_virtual_entry: 257 ↛ 268line 257 didn't jump to line 268, because the condition on line 257 was never false
258 if not path_type.can_be_virtual: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true
259 raise ValueError(
260 f"Bad file type or is_virtual_entry for {d['member_path']}."
261 " The file type cannot be virtual"
262 )
263 if fs_path is not None: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true
264 raise ValueError(
265 f'Invalid declaration for "{member_path}".'
266 " The path is listed as a virtual entry but has a file system path"
267 )
268 elif fs_path is None:
269 raise ValueError(
270 f'Invalid declaration for "{member_path}".'
271 " The path is neither a virtual path nor does it have a file system path!"
272 )
273 if path_type == PathType.DIRECTORY and not member_path.endswith("/"): 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true
274 raise ValueError(
275 f'Invalid declaration for "{member_path}".'
276 " The path is listed as a directory but does not end with a slash"
277 )
279 link_target = d.get("link_target")
280 if path_type == PathType.SYMLINK: 280 ↛ 281line 280 didn't jump to line 281, because the condition on line 280 was never true
281 if mode != 0o777:
282 raise ValueError(
283 f'Invalid declaration for "{member_path}".'
284 f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}."
285 )
286 if not link_target:
287 raise ValueError(
288 f'Invalid declaration for "{member_path}".'
289 " Symlinks must have a link_target"
290 )
291 elif link_target is not None and link_target != "": 291 ↛ 293line 291 didn't jump to line 293, because the condition on line 291 was never true
292 # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer
293 raise ValueError(
294 f'Invalid declaration for "{member_path}".'
295 " Only symlinks can have a link_target"
296 )
297 else:
298 link_target = ""
299 may_steal_fs_path = d.get("may_steal_fs_path") or False
301 if may_steal_fs_path: 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true
302 assert (
303 "debputy/scratch-dir/" in fs_path
304 ), f"{fs_path} should not have been stealable"
305 return cls(
306 member_path=member_path,
307 path_type=path_type,
308 fs_path=fs_path,
309 mode=mode,
310 owner=d["owner"],
311 uid=d["uid"],
312 group=d["group"],
313 gid=d["gid"],
314 mtime=float(d["mtime"]),
315 link_target=link_target,
316 is_virtual_entry=is_virtual_entry,
317 may_steal_fs_path=may_steal_fs_path,
318 )
321def output_intermediate_manifest(
322 manifest_output_file: str,
323 members: Iterable[TarMember],
324) -> None:
325 with open(manifest_output_file, "w") as fd:
326 output_intermediate_manifest_to_fd(fd, members)
329def output_intermediate_manifest_to_fd(
330 fd: IO[str], members: Iterable[TarMember]
331) -> None:
332 serial_format = [m.to_manifest() for m in members]
333 json.dump(serial_format, fd)