Coverage for src/debputy/manifest_parser/util.py: 89%

192 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import dataclasses 

2from typing import ( 

3 Iterator, 

4 Union, 

5 Self, 

6 Optional, 

7 List, 

8 Tuple, 

9 Mapping, 

10 get_origin, 

11 get_args, 

12 Any, 

13 Type, 

14 TypeVar, 

15 TYPE_CHECKING, 

16 Iterable, 

17) 

18 

19if TYPE_CHECKING: 

20 from debputy.manifest_parser.declarative_parser import DebputyParseHint 

21 

22 

23MP = TypeVar("MP", bound="DebputyParseHint") 

24StrOrInt = Union[str, int] 

25AttributePathAliasMapping = Mapping[ 

26 StrOrInt, Tuple[StrOrInt, Optional["AttributePathAliasMapping"]] 

27] 

28 

29 

30class AttributePath(object): 

31 __slots__ = ("parent", "name", "alias_mapping", "path_hint") 

32 

33 def __init__( 

34 self, 

35 parent: Optional["AttributePath"], 

36 key: Optional[Union[str, int]], 

37 *, 

38 alias_mapping: Optional[AttributePathAliasMapping] = None, 

39 ) -> None: 

40 self.parent = parent 

41 self.name = key 

42 self.path_hint: Optional[str] = None 

43 self.alias_mapping = alias_mapping 

44 

45 @classmethod 

46 def root_path(cls) -> "AttributePath": 

47 return AttributePath(None, None) 

48 

49 @classmethod 

50 def builtin_path(cls) -> "AttributePath": 

51 return AttributePath(None, "$builtin$") 

52 

53 @classmethod 

54 def test_path(cls) -> "AttributePath": 

55 return AttributePath(None, "$test$") 

56 

57 def __bool__(self) -> bool: 

58 return self.name is not None or self.parent is not None 

59 

60 def copy_with_path_hint(self, path_hint: str) -> "AttributePath": 

61 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping) 

62 p.path_hint = path_hint 

63 return p 

64 

65 def path_segments(self) -> Iterable[Union[str, int]]: 

66 segments = list(self._iter_path()) 

67 segments.reverse() 

68 yield from (s.name for s in segments) 

69 

70 @property 

71 def path(self) -> str: 

72 segments = list(self._iter_path()) 

73 segments.reverse() 

74 parts: List[str] = [] 

75 path_hint = None 

76 

77 for s in segments: 

78 k = s.name 

79 s_path_hint = s.path_hint 

80 if s_path_hint is not None: 

81 path_hint = s_path_hint 

82 if isinstance(k, int): 

83 parts.append(f"[{k}]") 

84 elif k is not None: 84 ↛ 77line 84 didn't jump to line 77, because the condition on line 84 was never false

85 if parts: 

86 parts.append(".") 

87 parts.append(k) 

88 if path_hint: 

89 parts.append(f" <Search for: {path_hint}>") 

90 if not parts: 90 ↛ 91line 90 didn't jump to line 91, because the condition on line 90 was never true

91 return "document root" 

92 return "".join(parts) 

93 

94 def __str__(self) -> str: 

95 return self.path 

96 

97 def __getitem__(self, item: Union[str, int]) -> "AttributePath": 

98 alias_mapping = None 

99 if self.alias_mapping: 

100 match = self.alias_mapping.get(item) 

101 if match: 

102 item, alias_mapping = match 

103 if item == "": 

104 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`. 

105 return AttributePath( 

106 self.parent, self.name, alias_mapping=alias_mapping 

107 ) 

108 return AttributePath(self, item, alias_mapping=alias_mapping) 

109 

110 def _iter_path(self) -> Iterator["AttributePath"]: 

111 current = self 

112 yield current 

113 while True: 

114 parent = current.parent 

115 if not parent: 

116 break 

117 current = parent 

118 yield current 

119 

120 

121@dataclasses.dataclass(slots=True, frozen=True) 

122class _SymbolicModeSegment: 

123 base_mode: int 

124 base_mask: int 

125 cap_x_mode: int 

126 cap_x_mask: int 

127 

128 def apply(self, current_mode: int, is_dir: bool) -> int: 

129 if current_mode & 0o111 or is_dir: 

130 chosen_mode = self.cap_x_mode 

131 mode_mask = self.cap_x_mask 

132 else: 

133 chosen_mode = self.base_mode 

134 mode_mask = self.base_mask 

135 # set ("="): mode mask clears relevant segment and current_mode are the desired bits 

136 # add ("+"): mode mask keeps everything and current_mode are the desired bits 

137 # remove ("-"): mode mask clears relevant bits and current_mode are 0 

138 return (current_mode & mode_mask) | chosen_mode 

139 

140 

141def _symbolic_mode_bit_inverse(v: int) -> int: 

142 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits 

143 # required, which is not what we want. 

144 return ~v & 0o7777 

145 

146 

147def parse_symbolic_mode( 

148 symbolic_mode: str, 

149 attribute_path: Optional[AttributePath], 

150) -> Iterator[_SymbolicModeSegment]: 

151 sticky_bit = 0o01000 

152 setuid_bit = 0o04000 

153 setgid_bit = 0o02000 

154 mode_group_flag = 0o7 

155 subject_mask_and_shift = { 

156 "u": (mode_group_flag << 6, 6), 

157 "g": (mode_group_flag << 3, 3), 

158 "o": (mode_group_flag << 0, 0), 

159 } 

160 bits = { 

161 "r": (0o4, 0o4), 

162 "w": (0o2, 0o2), 

163 "x": (0o1, 0o1), 

164 "X": (0o0, 0o1), 

165 "s": (0o0, 0o0), # Special-cased below (it depends on the subject) 

166 "t": (0o0, 0o0), # Special-cased below 

167 } 

168 modifiers = { 

169 "+", 

170 "-", 

171 "=", 

172 } 

173 in_path = f" in {attribute_path.path}" if attribute_path is not None else "" 

174 for orig_part in symbolic_mode.split(","): 

175 base_mode = 0 

176 cap_x_mode = 0 

177 part = orig_part 

178 subjects = set() 

179 while part and part[0] in ("u", "g", "o", "a"): 

180 subject = part[0] 

181 if subject == "a": 

182 subjects = {"u", "g", "o"} 

183 else: 

184 subjects.add(subject) 

185 part = part[1:] 

186 if not subjects: 

187 subjects = {"u", "g", "o"} 

188 

189 if part and part[0] in modifiers: 189 ↛ 191line 189 didn't jump to line 191, because the condition on line 189 was never false

190 modifier = part[0] 

191 elif not part: 

192 raise ValueError( 

193 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")' 

194 ) 

195 else: 

196 raise ValueError( 

197 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]' 

198 f' (from "{orig_part}")' 

199 ) 

200 part = part[1:] 

201 s_bit_seen = False 

202 t_bit_seen = False 

203 while part and part[0] in bits: 

204 if part == "s": 

205 s_bit_seen = True 

206 elif part == "t": 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 t_bit_seen = True 

208 elif part in ("u", "g", "o"): 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true

209 raise NotImplementedError( 

210 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an" 

211 " existing subject's permissions (a=u) in symbolic modes." 

212 ) 

213 else: 

214 matched_bits = bits.get(part[0]) 

215 if matched_bits is None: 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true

216 valid_bits = "".join(bits) 

217 raise ValueError( 

218 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters' 

219 f' in "{valid_bits}" (from "{orig_part}")' 

220 ) 

221 base_mode_bits, cap_x_mode_bits = bits[part[0]] 

222 base_mode |= base_mode_bits 

223 cap_x_mode |= cap_x_mode_bits 

224 part = part[1:] 

225 

226 if part: 226 ↛ 227line 226 didn't jump to line 227, because the condition on line 226 was never true

227 raise ValueError( 

228 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"' 

229 ) 

230 

231 final_base_mode = 0 

232 final_cap_x_mode = 0 

233 segment_mask = 0 

234 for subject in subjects: 

235 mask, shift = subject_mask_and_shift[subject] 

236 segment_mask |= mask 

237 final_base_mode |= base_mode << shift 

238 final_cap_x_mode |= cap_x_mode << shift 

239 if modifier == "=": 

240 segment_mask |= setuid_bit if "u" in subjects else 0 

241 segment_mask |= setgid_bit if "g" in subjects else 0 

242 segment_mask |= sticky_bit if "o" in subjects else 0 

243 if s_bit_seen: 

244 if "u" in subjects: 244 ↛ 247line 244 didn't jump to line 247, because the condition on line 244 was never false

245 final_base_mode |= setuid_bit 

246 final_cap_x_mode |= setuid_bit 

247 if "g" in subjects: 

248 final_base_mode |= setgid_bit 

249 final_cap_x_mode |= setgid_bit 

250 if t_bit_seen: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true

251 final_base_mode |= sticky_bit 

252 final_cap_x_mode |= sticky_bit 

253 if modifier == "+": 

254 final_base_mask = ~0 

255 final_cap_x_mask = ~0 

256 elif modifier == "-": 

257 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode) 

258 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode) 

259 final_base_mode = 0 

260 final_cap_x_mode = 0 

261 elif modifier == "=": 

262 # FIXME: Handle "unmentioned directory's setgid/setuid bits" 

263 inverted_mask = _symbolic_mode_bit_inverse(segment_mask) 

264 final_base_mask = inverted_mask 

265 final_cap_x_mask = inverted_mask 

266 else: 

267 raise AssertionError( 

268 f"Unknown modifier in symbolic mode: {modifier} - should not have happened" 

269 ) 

270 yield _SymbolicModeSegment( 

271 base_mode=final_base_mode, 

272 base_mask=final_base_mask, 

273 cap_x_mode=final_cap_x_mode, 

274 cap_x_mask=final_cap_x_mask, 

275 ) 

276 

277 

278def unpack_type( 

279 orig_type: Any, 

280 parsing_typed_dict_attribute: bool, 

281) -> Tuple[Any, Optional[Any], Tuple[Any, ...]]: 

282 raw_type = orig_type 

283 origin = get_origin(raw_type) 

284 args = get_args(raw_type) 

285 if not parsing_typed_dict_attribute and repr(origin) in ( 285 ↛ 289line 285 didn't jump to line 289, because the condition on line 285 was never true

286 "typing.NotRequired", 

287 "typing.Required", 

288 ): 

289 raise ValueError( 

290 f"The Required/NotRequired attributes cannot be used outside typed dicts," 

291 f" the type that triggered the error: {orig_type}" 

292 ) 

293 

294 while repr(origin) in ("typing.NotRequired", "typing.Required"): 

295 if len(args) != 1: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true

296 raise ValueError( 

297 f"The type {raw_type} should have exactly one type parameter" 

298 ) 

299 raw_type = args[0] 

300 origin = get_origin(raw_type) 

301 args = get_args(raw_type) 

302 

303 assert not isinstance(raw_type, tuple) 

304 

305 return raw_type, origin, args 

306 

307 

308def find_annotation( 

309 annotations: Tuple[Any, ...], 

310 anno_class: Type[MP], 

311) -> Optional[MP]: 

312 m = None 

313 for anno in annotations: 

314 if isinstance(anno, anno_class): 

315 if m is not None: 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true

316 raise ValueError( 

317 f"The annotation {anno_class.__name__} was used more than once" 

318 ) 

319 m = anno 

320 return m