Coverage for src/debputy/intermediate_manifest.py: 62%

172 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import dataclasses 

2import json 

3import os 

4import stat 

5import sys 

6import tarfile 

7from enum import Enum 

8 

9 

10from typing import Optional, List, Dict, Any, Iterable, Union, Self, Mapping, IO 

11 

12IntermediateManifest = List["TarMember"] 

13 

14 

15class PathType(Enum): 

16 FILE = ("file", tarfile.REGTYPE) 

17 DIRECTORY = ("directory", tarfile.DIRTYPE) 

18 SYMLINK = ("symlink", tarfile.SYMTYPE) 

19 # TODO: Add hardlink, FIFO, Char device, BLK device, etc. 

20 

21 @property 

22 def manifest_key(self) -> str: 

23 return self.value[0] 

24 

25 @property 

26 def tarinfo_type(self) -> bytes: 

27 return self.value[1] 

28 

29 @property 

30 def can_be_virtual(self) -> bool: 

31 return self in (PathType.DIRECTORY, PathType.SYMLINK) 

32 

33 

34KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType} 

35 

36 

37def _dirname(path: str) -> str: 

38 path = path.rstrip("/") 

39 if path == ".": 39 ↛ 41line 39 didn't jump to line 41, because the condition on line 39 was never false

40 return path 

41 return os.path.dirname(path) 

42 

43 

44def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType: 

45 if stat.S_ISREG(st_mode): 

46 path_type = PathType.FILE 

47 elif stat.S_ISDIR(st_mode): 

48 path_type = PathType.DIRECTORY 

49 # elif stat.S_ISFIFO(st_result): 

50 # type = FIFOTYPE 

51 elif stat.S_ISLNK(st_mode): 

52 raise ValueError( 

53 "Symlinks should have been rewritten to use the virtual rule." 

54 " Otherwise, the link would not be normalized according to Debian Policy." 

55 ) 

56 # elif stat.S_ISCHR(st_result): 

57 # type = CHRTYPE 

58 # elif stat.S_ISBLK(st_result): 

59 # type = BLKTYPE 

60 else: 

61 raise ValueError( 

62 f"The path {fs_path} had an unsupported/unknown file type." 

63 f" Probably a bug in the tool" 

64 ) 

65 return path_type 

66 

67 

68@dataclasses.dataclass(slots=True) 

69class TarMember: 

70 member_path: str 

71 path_type: PathType 

72 fs_path: Optional[str] 

73 mode: int 

74 owner: str 

75 uid: int 

76 group: str 

77 gid: int 

78 mtime: float 

79 link_target: str = "" 

80 is_virtual_entry: bool = False 

81 may_steal_fs_path: bool = False 

82 

83 def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo: 

84 tar_info: tarfile.TarInfo 

85 if self.is_virtual_entry: 

86 assert self.path_type.can_be_virtual 

87 tar_info = tar_fd.tarinfo(self.member_path) 

88 tar_info.size = 0 

89 tar_info.type = self.path_type.tarinfo_type 

90 tar_info.linkpath = self.link_target 

91 else: 

92 try: 

93 tar_info = tar_fd.gettarinfo( 

94 name=self.fs_path, arcname=self.member_path 

95 ) 

96 except (TypeError, ValueError) as e: 

97 raise ValueError( 

98 f"Unable to prepare tar info for {self.member_path}" 

99 ) from e 

100 # TODO: Eventually, we should be able to unconditionally rely on link_target. However, 

101 # until we got symlinks and hardlinks correctly done in the JSON generator, it will be 

102 # conditional for now. 

103 if self.link_target != "": 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true

104 tar_info.linkpath = self.link_target 

105 tar_info.mode = self.mode 

106 tar_info.uname = self.owner 

107 tar_info.uid = self.uid 

108 tar_info.gname = self.group 

109 tar_info.gid = self.gid 

110 tar_info.mode = self.mode 

111 tar_info.mtime = int(self.mtime) 

112 

113 return tar_info 

114 

115 @classmethod 

116 def from_file( 

117 cls, 

118 member_path: str, 

119 fs_path: str, 

120 mode: Optional[int] = None, 

121 owner: str = "root", 

122 uid: int = 0, 

123 group: str = "root", 

124 gid: int = 0, 

125 path_mtime: Optional[Union[float, int]] = None, 

126 clamp_mtime_to: Optional[int] = None, 

127 path_type: Optional[PathType] = None, 

128 may_steal_fs_path: bool = False, 

129 ) -> "TarMember": 

130 # Avoid lstat'ing if we can as it makes it easier to do tests of the code 

131 # (as we do not need an existing physical fs path) 

132 if path_type is None or path_mtime is None or mode is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true

133 st_result = os.lstat(fs_path) 

134 st_mode = st_result.st_mode 

135 if mode is None: 

136 mode = st_mode 

137 if path_mtime is None: 

138 path_mtime = st_result.st_mtime 

139 if path_type is None: 

140 path_type = _fs_type_from_st_mode(fs_path, st_mode) 

141 

142 if clamp_mtime_to is not None and path_mtime > clamp_mtime_to: 142 ↛ 143line 142 didn't jump to line 143, because the condition on line 142 was never true

143 path_mtime = clamp_mtime_to 

144 

145 if may_steal_fs_path: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true

146 assert ( 

147 "debputy/scratch-dir/" in fs_path 

148 ), f"{fs_path} should not have been stealable" 

149 

150 return cls( 

151 member_path=member_path, 

152 path_type=path_type, 

153 fs_path=fs_path, 

154 mode=mode, 

155 owner=owner, 

156 uid=uid, 

157 group=group, 

158 gid=gid, 

159 mtime=float(path_mtime), 

160 is_virtual_entry=False, 

161 may_steal_fs_path=may_steal_fs_path, 

162 ) 

163 

164 @classmethod 

165 def virtual_path( 

166 cls, 

167 member_path: str, 

168 path_type: PathType, 

169 mtime: float, 

170 mode: int, 

171 link_target: str = "", 

172 owner: str = "root", 

173 uid: int = 0, 

174 group: str = "root", 

175 gid: int = 0, 

176 ) -> Self: 

177 if not path_type.can_be_virtual: 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true

178 raise ValueError(f"The path type {path_type.name} cannot be virtual") 

179 if (path_type == PathType.SYMLINK) ^ bool(link_target): 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true

180 if not link_target: 

181 raise ValueError("Symlinks must have a link target") 

182 # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this 

183 # code then! 

184 raise ValueError("Non-symlinks must not have a link target") 

185 return cls( 

186 member_path=member_path, 

187 path_type=path_type, 

188 fs_path=None, 

189 link_target=link_target, 

190 mode=mode, 

191 owner=owner, 

192 uid=uid, 

193 group=group, 

194 gid=gid, 

195 mtime=mtime, 

196 is_virtual_entry=True, 

197 ) 

198 

199 def clone_and_replace(self, /, **changes: Any) -> "TarMember": 

200 return dataclasses.replace(self, **changes) 

201 

202 def to_manifest(self) -> Dict[str, Any]: 

203 d = dataclasses.asdict(self) 

204 try: 

205 d["mode"] = oct(self.mode) 

206 except (TypeError, ValueError) as e: 

207 raise TypeError(f"Bad mode in TarMember {self.member_path}") from e 

208 d["path_type"] = self.path_type.manifest_key 

209 # "compress" the output by removing redundant fields 

210 if self.link_target is None or self.link_target == "": 210 ↛ 212line 210 didn't jump to line 212, because the condition on line 210 was never false

211 del d["link_target"] 

212 if self.is_virtual_entry: 212 ↛ 216line 212 didn't jump to line 216, because the condition on line 212 was never false

213 assert self.fs_path is None 

214 del d["fs_path"] 

215 else: 

216 del d["is_virtual_entry"] 

217 return d 

218 

219 @classmethod 

220 def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest: 

221 directories = {"."} 

222 if manifest_path == "-": 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 with sys.stdin as fd: 

224 data = json.load(fd) 

225 contents = [TarMember.from_dict(m) for m in data] 

226 else: 

227 with open(manifest_path) as fd: 

228 data = json.load(fd) 

229 contents = [TarMember.from_dict(m) for m in data] 

230 if not contents: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 raise ValueError( 

232 "Empty manifest (note that the root directory should always be present" 

233 ) 

234 if contents[0].member_path != "./": 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true

235 raise ValueError('The first member must always be the root directory "./"') 

236 for tar_member in contents: 

237 directory = _dirname(tar_member.member_path) 

238 if directory not in directories: 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true

239 raise ValueError( 

240 f'The path "{tar_member.member_path}" came before the directory it is in (or the path' 

241 f" is not a directory). Either way leads to a broken deb." 

242 ) 

243 if tar_member.path_type == PathType.DIRECTORY: 243 ↛ 236line 243 didn't jump to line 236, because the condition on line 243 was never false

244 directories.add(tar_member.member_path.rstrip("/")) 

245 return contents 

246 

247 @classmethod 

248 def from_dict(cls, d: Any) -> "TarMember": 

249 member_path = d["member_path"] 

250 raw_mode = d["mode"] 

251 if not raw_mode.startswith("0o"): 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true

252 raise ValueError(f"Bad mode for {member_path}") 

253 is_virtual_entry = d.get("is_virtual_entry") or False 

254 path_type = KEY2PATH_TYPE[d["path_type"]] 

255 fs_path = d.get("fs_path") 

256 mode = int(raw_mode[2:], 8) 

257 if is_virtual_entry: 257 ↛ 268line 257 didn't jump to line 268, because the condition on line 257 was never false

258 if not path_type.can_be_virtual: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true

259 raise ValueError( 

260 f"Bad file type or is_virtual_entry for {d['member_path']}." 

261 " The file type cannot be virtual" 

262 ) 

263 if fs_path is not None: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true

264 raise ValueError( 

265 f'Invalid declaration for "{member_path}".' 

266 " The path is listed as a virtual entry but has a file system path" 

267 ) 

268 elif fs_path is None: 

269 raise ValueError( 

270 f'Invalid declaration for "{member_path}".' 

271 " The path is neither a virtual path nor does it have a file system path!" 

272 ) 

273 if path_type == PathType.DIRECTORY and not member_path.endswith("/"): 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true

274 raise ValueError( 

275 f'Invalid declaration for "{member_path}".' 

276 " The path is listed as a directory but does not end with a slash" 

277 ) 

278 

279 link_target = d.get("link_target") 

280 if path_type == PathType.SYMLINK: 280 ↛ 281line 280 didn't jump to line 281, because the condition on line 280 was never true

281 if mode != 0o777: 

282 raise ValueError( 

283 f'Invalid declaration for "{member_path}".' 

284 f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}." 

285 ) 

286 if not link_target: 

287 raise ValueError( 

288 f'Invalid declaration for "{member_path}".' 

289 " Symlinks must have a link_target" 

290 ) 

291 elif link_target is not None and link_target != "": 291 ↛ 293line 291 didn't jump to line 293, because the condition on line 291 was never true

292 # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer 

293 raise ValueError( 

294 f'Invalid declaration for "{member_path}".' 

295 " Only symlinks can have a link_target" 

296 ) 

297 else: 

298 link_target = "" 

299 may_steal_fs_path = d.get("may_steal_fs_path") or False 

300 

301 if may_steal_fs_path: 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true

302 assert ( 

303 "debputy/scratch-dir/" in fs_path 

304 ), f"{fs_path} should not have been stealable" 

305 return cls( 

306 member_path=member_path, 

307 path_type=path_type, 

308 fs_path=fs_path, 

309 mode=mode, 

310 owner=d["owner"], 

311 uid=d["uid"], 

312 group=d["group"], 

313 gid=d["gid"], 

314 mtime=float(d["mtime"]), 

315 link_target=link_target, 

316 is_virtual_entry=is_virtual_entry, 

317 may_steal_fs_path=may_steal_fs_path, 

318 ) 

319 

320 

321def output_intermediate_manifest( 

322 manifest_output_file: str, 

323 members: Iterable[TarMember], 

324) -> None: 

325 with open(manifest_output_file, "w") as fd: 

326 output_intermediate_manifest_to_fd(fd, members) 

327 

328 

329def output_intermediate_manifest_to_fd( 

330 fd: IO[str], members: Iterable[TarMember] 

331) -> None: 

332 serial_format = [m.to_manifest() for m in members] 

333 json.dump(serial_format, fd)