From 1e5c28f36b0fd2d5ac1683c88d48e3d7c243e993 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 14 Apr 2024 22:18:22 +0200 Subject: Adding upstream version 0.1.28. Signed-off-by: Daniel Baumann --- .../d_4f754ff76d8638bb_declarative_parser_py.html | 2102 ++++++++++++++++++++ 1 file changed, 2102 insertions(+) create mode 100644 coverage-report/d_4f754ff76d8638bb_declarative_parser_py.html (limited to 'coverage-report/d_4f754ff76d8638bb_declarative_parser_py.html') diff --git a/coverage-report/d_4f754ff76d8638bb_declarative_parser_py.html b/coverage-report/d_4f754ff76d8638bb_declarative_parser_py.html new file mode 100644 index 0000000..79bc071 --- /dev/null +++ b/coverage-report/d_4f754ff76d8638bb_declarative_parser_py.html @@ -0,0 +1,2102 @@ + + + + + Coverage for src/debputy/manifest_parser/declarative_parser.py: 76% + + + + + +
+
+

+ Coverage for src/debputy/manifest_parser/declarative_parser.py: + 76% +

+ +

+ 781 statements   + + + + +

+

+ « prev     + ^ index     + » next +       + coverage.py v7.2.7, + created at 2024-04-07 12:14 +0200 +

+ +
+
+
+

1import collections 

+

2import dataclasses 

+

3from typing import ( 

+

4 Any, 

+

5 Callable, 

+

6 Tuple, 

+

7 TypedDict, 

+

8 Dict, 

+

9 get_type_hints, 

+

10 Annotated, 

+

11 get_args, 

+

12 get_origin, 

+

13 TypeVar, 

+

14 Generic, 

+

15 FrozenSet, 

+

16 Mapping, 

+

17 Optional, 

+

18 cast, 

+

19 is_typeddict, 

+

20 Type, 

+

21 Union, 

+

22 List, 

+

23 Collection, 

+

24 NotRequired, 

+

25 Iterable, 

+

26 Literal, 

+

27 Sequence, 

+

28 Container, 

+

29) 

+

30 

+

31from debputy.manifest_parser.base_types import ( 

+

32 DebputyParsedContent, 

+

33 FileSystemMatchRule, 

+

34 FileSystemExactMatchRule, 

+

35 DebputyDispatchableType, 

+

36 TypeMapping, 

+

37) 

+

38from debputy.manifest_parser.exceptions import ( 

+

39 ManifestParseException, 

+

40) 

+

41from debputy.manifest_parser.mapper_code import ( 

+

42 normalize_into_list, 

+

43 wrap_into_list, 

+

44 map_each_element, 

+

45) 

+

46from debputy.manifest_parser.parser_data import ParserContextData 

+

47from debputy.manifest_parser.util import AttributePath, unpack_type, find_annotation 

+

48from debputy.plugin.api.impl_types import ( 

+

49 DeclarativeInputParser, 

+

50 TD, 

+

51 _ALL_PACKAGE_TYPES, 

+

52 resolve_package_type_selectors, 

+

53 ListWrappedDeclarativeInputParser, 

+

54 DispatchingObjectParser, 

+

55 DispatchingTableParser, 

+

56 TTP, 

+

57 TP, 

+

58 InPackageContextParser, 

+

59) 

+

60from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector 

+

61from debputy.util import _info, _warn, assume_not_none 

+

62 

+

63try: 

+

64 from Levenshtein import distance 

+

65except ImportError: 

+

66 _WARN_ONCE = False 

+

67 

+

68 def _detect_possible_typo( 

+

69 _key: str, 

+

70 _value: object, 

+

71 _manifest_attributes: Mapping[str, "AttributeDescription"], 

+

72 _path: "AttributePath", 

+

73 ) -> None: 

+

74 global _WARN_ONCE 

+

75 if not _WARN_ONCE: 

+

76 _WARN_ONCE = True 

+

77 _info( 

+

78 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

+

79 ) 

+

80 

+

81else: 

+

82 

+

83 def _detect_possible_typo( 

+

84 key: str, 

+

85 value: object, 

+

86 manifest_attributes: Mapping[str, "AttributeDescription"], 

+

87 path: "AttributePath", 

+

88 ) -> None: 

+

89 k_len = len(key) 

+

90 key_path = path[key] 

+

91 matches: List[str] = [] 

+

92 current_match_strength = 0 

+

93 for acceptable_key, attr in manifest_attributes.items(): 

+

94 if abs(k_len - len(acceptable_key)) > 2: 

+

95 continue 

+

96 d = distance(key, acceptable_key) 

+

97 if d > 2: 

+

98 continue 

+

99 try: 

+

100 attr.type_validator.ensure_type(value, key_path) 

+

101 except ManifestParseException: 

+

102 if attr.type_validator.base_type_match(value): 

+

103 match_strength = 1 

+

104 else: 

+

105 match_strength = 0 

+

106 else: 

+

107 match_strength = 2 

+

108 

+

109 if match_strength < current_match_strength: 

+

110 continue 

+

111 if match_strength > current_match_strength: 

+

112 current_match_strength = match_strength 

+

113 matches.clear() 

+

114 matches.append(acceptable_key) 

+

115 

+

116 if not matches: 

+

117 return 

+

118 ref = f'at "{path.path}"' if path else "at the manifest root level" 

+

119 if len(matches) == 1: 

+

120 possible_match = repr(matches[0]) 

+

121 _warn( 

+

122 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

+

123 ) 

+

124 else: 

+

125 matches.sort() 

+

126 possible_matches = ", ".join(repr(a) for a in matches) 

+

127 _warn( 

+

128 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

+

129 ) 

+

130 

+

131 

+

132SF = TypeVar("SF") 

+

133T = TypeVar("T") 

+

134S = TypeVar("S") 

+

135 

+

136 

+

137_NONE_TYPE = type(None) 

+

138 

+

139 

+

140# These must be able to appear in an "isinstance" check and must be builtin types. 

+

141BASIC_SIMPLE_TYPES = { 

+

142 str: "string", 

+

143 int: "integer", 

+

144 bool: "boolean", 

+

145} 

+

146 

+

147 

+

148class AttributeTypeHandler: 

+

149 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

+

150 

+

151 def __init__( 

+

152 self, 

+

153 description: str, 

+

154 ensure_type: Callable[[Any, AttributePath], None], 

+

155 *, 

+

156 base_type: Optional[Type[Any]] = None, 

+

157 mapper: Optional[ 

+

158 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

+

159 ] = None, 

+

160 ) -> None: 

+

161 self._description = description 

+

162 self._ensure_type = ensure_type 

+

163 self.base_type = base_type 

+

164 self.mapper = mapper 

+

165 

+

166 def describe_type(self) -> str: 

+

167 return self._description 

+

168 

+

169 def ensure_type(self, obj: object, path: AttributePath) -> None: 

+

170 self._ensure_type(obj, path) 

+

171 

+

172 def base_type_match(self, obj: object) -> bool: 

+

173 base_type = self.base_type 

+

174 return base_type is not None and isinstance(obj, base_type) 

+

175 

+

176 def map_type( 

+

177 self, 

+

178 value: Any, 

+

179 path: AttributePath, 

+

180 parser_context: Optional["ParserContextData"], 

+

181 ) -> Any: 

+

182 mapper = self.mapper 

+

183 if mapper is not None: 

+

184 return mapper(value, path, parser_context) 

+

185 return value 

+

186 

+

187 def combine_mapper( 

+

188 self, 

+

189 mapper: Optional[ 

+

190 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

+

191 ], 

+

192 ) -> "AttributeTypeHandler": 

+

193 if mapper is None: 

+

194 return self 

+

195 if self.mapper is not None: 

+

196 m = self.mapper 

+

197 

+

198 def _combined_mapper( 

+

199 value: Any, 

+

200 path: AttributePath, 

+

201 parser_context: Optional["ParserContextData"], 

+

202 ) -> Any: 

+

203 return mapper(m(value, path, parser_context), path, parser_context) 

+

204 

+

205 else: 

+

206 _combined_mapper = mapper 

+

207 

+

208 return AttributeTypeHandler( 

+

209 self._description, 

+

210 self._ensure_type, 

+

211 base_type=self.base_type, 

+

212 mapper=_combined_mapper, 

+

213 ) 

+

214 

+

215 

+

216@dataclasses.dataclass(slots=True) 

+

217class AttributeDescription: 

+

218 source_attribute_name: str 

+

219 target_attribute: str 

+

220 attribute_type: Any 

+

221 type_validator: AttributeTypeHandler 

+

222 annotations: Tuple[Any, ...] 

+

223 conflicting_attributes: FrozenSet[str] 

+

224 conditional_required: Optional["ConditionalRequired"] 

+

225 parse_hints: Optional["DetectedDebputyParseHint"] = None 

+

226 is_optional: bool = False 

+

227 

+

228 

+

229def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

+

230 if attribute_path.path_hint is not None: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

+

231 return True 

+

232 if isinstance(v, str): 

+

233 attribute_path.path_hint = v 

+

234 return True 

+

235 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

+

236 attribute_path.path_hint = v[0] 

+

237 return True 

+

238 return False 

+

239 

+

240 

+

241@dataclasses.dataclass(slots=True, frozen=True) 

+

242class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

+

243 alt_form_parser: AttributeDescription 

+

244 inline_reference_documentation: Optional[ParserDocumentation] = None 

+

245 

+

246 def parse_input( 

+

247 self, 

+

248 value: object, 

+

249 path: AttributePath, 

+

250 *, 

+

251 parser_context: Optional["ParserContextData"] = None, 

+

252 ) -> TD: 

+

253 if self.reference_documentation_url is not None: 

+

254 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

+

255 else: 

+

256 doc_ref = "" 

+

257 

+

258 alt_form_parser = self.alt_form_parser 

+

259 if value is None: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

+

260 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

+

261 if self.reference_documentation_url is not None: 

+

262 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

+

263 raise ManifestParseException( 

+

264 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

+

265 ) 

+

266 _extract_path_hint(value, path) 

+

267 alt_form_parser.type_validator.ensure_type(value, path) 

+

268 attribute = alt_form_parser.target_attribute 

+

269 alias_mapping = { 

+

270 attribute: ("", None), 

+

271 } 

+

272 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

+

273 path.alias_mapping = alias_mapping 

+

274 return cast("TD", {attribute: v}) 

+

275 

+

276 

+

277@dataclasses.dataclass(slots=True) 

+

278class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

+

279 input_time_required_parameters: FrozenSet[str] 

+

280 all_parameters: FrozenSet[str] 

+

281 manifest_attributes: Mapping[str, "AttributeDescription"] 

+

282 source_attributes: Mapping[str, "AttributeDescription"] 

+

283 at_least_one_of: FrozenSet[FrozenSet[str]] 

+

284 alt_form_parser: Optional[AttributeDescription] 

+

285 mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset() 

+

286 _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None 

+

287 inline_reference_documentation: Optional[ParserDocumentation] = None 

+

288 path_hint_source_attributes: Sequence[str] = tuple() 

+

289 

+

290 def _parse_alt_form( 

+

291 self, 

+

292 value: object, 

+

293 path: AttributePath, 

+

294 *, 

+

295 parser_context: Optional["ParserContextData"] = None, 

+

296 ) -> TD: 

+

297 alt_form_parser = self.alt_form_parser 

+

298 if alt_form_parser is None: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

+

299 raise ManifestParseException( 

+

300 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

+

301 ) 

+

302 _extract_path_hint(value, path) 

+

303 alt_form_parser.type_validator.ensure_type(value, path) 

+

304 assert ( 

+

305 value is not None 

+

306 ), "The alternative form was None, but the parser should have rejected None earlier." 

+

307 attribute = alt_form_parser.target_attribute 

+

308 alias_mapping = { 

+

309 attribute: ("", None), 

+

310 } 

+

311 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

+

312 path.alias_mapping = alias_mapping 

+

313 return cast("TD", {attribute: v}) 

+

314 

+

315 def _validate_expected_keys( 

+

316 self, 

+

317 value: Dict[Any, Any], 

+

318 path: AttributePath, 

+

319 *, 

+

320 parser_context: Optional["ParserContextData"] = None, 

+

321 ) -> None: 

+

322 unknown_keys = value.keys() - self.all_parameters 

+

323 doc_ref = self._doc_url_error_suffix() 

+

324 if unknown_keys: 324 ↛ 325line 324 didn't jump to line 325, because the condition on line 324 was never true

+

325 for k in unknown_keys: 

+

326 if isinstance(k, str): 

+

327 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

+

328 unused_keys = self.all_parameters - value.keys() 

+

329 if unused_keys: 

+

330 k = ", ".join(unused_keys) 

+

331 raise ManifestParseException( 

+

332 f'Unknown keys "{unknown_keys}" at {path.path}". Keys that could be used here are: {k}.{doc_ref}' 

+

333 ) 

+

334 raise ManifestParseException( 

+

335 f'Unknown keys "{unknown_keys}" at {path.path}". Please remove them.{doc_ref}' 

+

336 ) 

+

337 missing_keys = self.input_time_required_parameters - value.keys() 

+

338 if missing_keys: 

+

339 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

+

340 raise ManifestParseException( 

+

341 f"The following keys were required but not present at {path.path}: {required}{doc_ref}" 

+

342 ) 

+

343 for maybe_required in self.all_parameters - value.keys(): 

+

344 attr = self.manifest_attributes[maybe_required] 

+

345 assert attr.conditional_required is None or parser_context is not None 

+

346 if ( 346 ↛ 352line 346 didn't jump to line 352

+

347 attr.conditional_required is not None 

+

348 and attr.conditional_required.condition_applies( 

+

349 assume_not_none(parser_context) 

+

350 ) 

+

351 ): 

+

352 reason = attr.conditional_required.reason 

+

353 raise ManifestParseException( 

+

354 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path}. {reason}{doc_ref}' 

+

355 ) 

+

356 for keyset in self.at_least_one_of: 

+

357 matched_keys = value.keys() & keyset 

+

358 if not matched_keys: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

+

359 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

+

360 raise ManifestParseException( 

+

361 f"At least one of the following keys must be present at {path.path}:" 

+

362 f" {conditionally_required}{doc_ref}" 

+

363 ) 

+

364 for group in self.mutually_exclusive_attributes: 

+

365 matched = value.keys() & group 

+

366 if len(matched) > 1: 366 ↛ 367line 366 didn't jump to line 367, because the condition on line 366 was never true

+

367 ck = ", ".join(repr(k) for k in sorted(matched)) 

+

368 raise ManifestParseException( 

+

369 f"Could not parse {path.path}: The following attributes are" 

+

370 f" mutually exclusive: {ck}{doc_ref}" 

+

371 ) 

+

372 

+

373 def _parse_typed_dict_form( 

+

374 self, 

+

375 value: Dict[Any, Any], 

+

376 path: AttributePath, 

+

377 *, 

+

378 parser_context: Optional["ParserContextData"] = None, 

+

379 ) -> TD: 

+

380 self._validate_expected_keys(value, path, parser_context=parser_context) 

+

381 result = {} 

+

382 per_attribute_conflicts = self._per_attribute_conflicts() 

+

383 alias_mapping = {} 

+

384 for path_hint_source_attributes in self.path_hint_source_attributes: 

+

385 v = value.get(path_hint_source_attributes) 

+

386 if v is not None and _extract_path_hint(v, path): 

+

387 break 

+

388 for k, v in value.items(): 

+

389 attr = self.manifest_attributes[k] 

+

390 matched = value.keys() & per_attribute_conflicts[k] 

+

391 if matched: 391 ↛ 392line 391 didn't jump to line 392, because the condition on line 391 was never true

+

392 ck = ", ".join(repr(k) for k in sorted(matched)) 

+

393 raise ManifestParseException( 

+

394 f'The attribute "{k}" at {path.path} cannot be used with the following' 

+

395 f" attributes: {ck}{self._doc_url_error_suffix()}" 

+

396 ) 

+

397 nk = attr.target_attribute 

+

398 key_path = path[k] 

+

399 attr.type_validator.ensure_type(v, key_path) 

+

400 if v is None: 400 ↛ 401line 400 didn't jump to line 401, because the condition on line 400 was never true

+

401 continue 

+

402 if k != nk: 

+

403 alias_mapping[nk] = k, None 

+

404 v = attr.type_validator.map_type(v, key_path, parser_context) 

+

405 result[nk] = v 

+

406 if alias_mapping: 

+

407 path.alias_mapping = alias_mapping 

+

408 return cast("TD", result) 

+

409 

+

410 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

+

411 doc_url = self.reference_documentation_url 

+

412 if doc_url is not None: 

+

413 if see_url_version: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true

+

414 return f" Please see {doc_url} for the documentation." 

+

415 return f" (Documentation: {doc_url})" 

+

416 return "" 

+

417 

+

418 def parse_input( 

+

419 self, 

+

420 value: object, 

+

421 path: AttributePath, 

+

422 *, 

+

423 parser_context: Optional["ParserContextData"] = None, 

+

424 ) -> TD: 

+

425 if value is None: 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true

+

426 form_note = " The attribute must be a mapping." 

+

427 if self.alt_form_parser is not None: 

+

428 form_note = ( 

+

429 " The attribute can be a mapping or a non-mapping format" 

+

430 ' (usually, "non-mapping format" means a string or a list of strings).' 

+

431 ) 

+

432 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

+

433 raise ManifestParseException( 

+

434 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

+

435 ) 

+

436 

+

437 if not isinstance(value, dict): 

+

438 return self._parse_alt_form(value, path, parser_context=parser_context) 

+

439 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

+

440 

+

441 def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: 

+

442 conflicts = self._per_attribute_conflicts_cache 

+

443 if conflicts is not None: 

+

444 return conflicts 

+

445 attrs = self.source_attributes 

+

446 conflicts = { 

+

447 a.source_attribute_name: frozenset( 

+

448 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

+

449 ) 

+

450 for a in attrs.values() 

+

451 } 

+

452 self._per_attribute_conflicts_cache = conflicts 

+

453 return self._per_attribute_conflicts_cache 

+

454 

+

455 

+

456class DebputyParseHint: 

+

457 @classmethod 

+

458 def target_attribute(cls, target_attribute: str) -> "DebputyParseHint": 

+

459 """Define this source attribute to have a different target attribute name 

+

460 

+

461 As an example: 

+

462 

+

463 >>> class SourceType(TypedDict): 

+

464 ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] 

+

465 ... sources: NotRequired[List[str]] 

+

466 >>> class TargetType(TypedDict): 

+

467 ... sources: List[str] 

+

468 >>> pg = ParserGenerator() 

+

469 >>> parser = pg.generate_parser(TargetType, source_content=SourceType) 

+

470 

+

471 In this example, the user can provide either `source` or `sources` and the parser will 

+

472 map them to the `sources` attribute in the `TargetType`. Note this example relies on 

+

473 the builtin mapping of `str` to `List[str]` to align the types between `source` (from 

+

474 SourceType) and `sources` (from TargetType). 

+

475 

+

476 The following rules apply: 

+

477 

+

478 * All source attributes that map to the same target attribute will be mutually exclusive 

+

479 (that is, the user cannot give `source` *and* `sources` as input). 

+

480 * When the target attribute is required, the source attributes are conditionally 

+

481 mandatory requiring the user to provide exactly one of them. 

+

482 * When multiple source attributes point to a single target attribute, none of the source 

+

483 attributes can be Required. 

+

484 * The annotation can only be used for the source type specification and the source type 

+

485 specification must be different from the target type specification. 

+

486 

+

487 The `target_attribute` annotation can be used without having multiple source attributes. This 

+

488 can be useful if the source attribute name is not valid as a python variable identifier to 

+

489 rename it to a valid python identifier. 

+

490 

+

491 :param target_attribute: The attribute name in the target content 

+

492 :return: The annotation. 

+

493 """ 

+

494 return TargetAttribute(target_attribute) 

+

495 

+

496 @classmethod 

+

497 def conflicts_with_source_attributes( 

+

498 cls, 

+

499 *conflicting_source_attributes: str, 

+

500 ) -> "DebputyParseHint": 

+

501 """Declare a conflict with one or more source attributes 

+

502 

+

503 Example: 

+

504 

+

505 >>> class SourceType(TypedDict): 

+

506 ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] 

+

507 ... sources: NotRequired[List[str]] 

+

508 ... into_dir: NotRequired[str] 

+

509 ... renamed_to: Annotated[ 

+

510 ... NotRequired[str], 

+

511 ... DebputyParseHint.conflicts_with_source_attributes("sources", "into_dir") 

+

512 ... ] 

+

513 >>> class TargetType(TypedDict): 

+

514 ... sources: List[str] 

+

515 ... into_dir: NotRequired[str] 

+

516 ... renamed_to: NotRequired[str] 

+

517 >>> pg = ParserGenerator() 

+

518 >>> parser = pg.generate_parser(TargetType, source_content=SourceType) 

+

519 

+

520 In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report 

+

521 an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for 

+

522 the input source. That is, it is irrelevant that `sources` and `source´ happens to "map" to the same target 

+

523 attribute. 

+

524 

+

525 The following rules apply: 

+

526 * It is not possible for a target attribute to declare conflicts unless the target type spec is reused as 

+

527 source type spec. 

+

528 * All attributes involved in a conflict must be NotRequired. If any of the attributes are Required, then 

+

529 the parser generator will reject the input. 

+

530 * All attributes listed in the conflict must be valid attributes in the source type spec. 

+

531 

+

532 Note you do not have to specify conflicts between two attributes with the same target attribute name. The 

+

533 `target_attribute` annotation will handle that for you. 

+

534 

+

535 :param conflicting_source_attributes: All source attributes that cannot be used with this attribute. 

+

536 :return: The annotation. 

+

537 """ 

+

538 if len(conflicting_source_attributes) < 1: 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true

+

539 raise ValueError( 

+

540 "DebputyParseHint.conflicts_with_source_attributes requires at least one attribute as input" 

+

541 ) 

+

542 return ConflictWithSourceAttribute(frozenset(conflicting_source_attributes)) 

+

543 

+

544 @classmethod 

+

545 def required_when_single_binary( 

+

546 cls, 

+

547 *, 

+

548 package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, 

+

549 ) -> "DebputyParseHint": 

+

550 """Declare a source attribute as required when the source package produces exactly one binary package 

+

551 

+

552 The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition 

+

553 can only be used for source attributes. 

+

554 """ 

+

555 resolved_package_types = resolve_package_type_selectors(package_type) 

+

556 reason = "The field is required for source packages producing exactly one binary package" 

+

557 if resolved_package_types != _ALL_PACKAGE_TYPES: 

+

558 types = ", ".join(sorted(resolved_package_types)) 

+

559 reason += f" of type {types}" 

+

560 return ConditionalRequired( 

+

561 reason, 

+

562 lambda c: len( 

+

563 [ 

+

564 p 

+

565 for p in c.binary_packages.values() 

+

566 if p.package_type in package_type 

+

567 ] 

+

568 ) 

+

569 == 1, 

+

570 ) 

+

571 return ConditionalRequired( 

+

572 reason, 

+

573 lambda c: c.is_single_binary_package, 

+

574 ) 

+

575 

+

576 @classmethod 

+

577 def required_when_multi_binary( 

+

578 cls, 

+

579 *, 

+

580 package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, 

+

581 ) -> "DebputyParseHint": 

+

582 """Declare a source attribute as required when the source package produces two or more binary package 

+

583 

+

584 The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition 

+

585 can only be used for source attributes. 

+

586 """ 

+

587 resolved_package_types = resolve_package_type_selectors(package_type) 

+

588 reason = "The field is required for source packages producing two or more binary packages" 

+

589 if resolved_package_types != _ALL_PACKAGE_TYPES: 

+

590 types = ", ".join(sorted(resolved_package_types)) 

+

591 reason = ( 

+

592 "The field is required for source packages producing not producing exactly one binary packages" 

+

593 f" of type {types}" 

+

594 ) 

+

595 return ConditionalRequired( 

+

596 reason, 

+

597 lambda c: len( 

+

598 [ 

+

599 p 

+

600 for p in c.binary_packages.values() 

+

601 if p.package_type in package_type 

+

602 ] 

+

603 ) 

+

604 != 1, 

+

605 ) 

+

606 return ConditionalRequired( 

+

607 reason, 

+

608 lambda c: not c.is_single_binary_package, 

+

609 ) 

+

610 

+

611 @classmethod 

+

612 def manifest_attribute(cls, attribute: str) -> "DebputyParseHint": 

+

613 """Declare what the attribute name (as written in the manifest) should be 

+

614 

+

615 By default, debputy will do an attribute normalizing that will take valid python identifiers such 

+

616 as `dest_dir` and remap it to the manifest variant (such as `dest-dir`) automatically. If you have 

+

617 a special case, where this built-in normalization is insufficient or the python name is considerably 

+

618 different from what the user would write in the manifest, you can use this parse hint to set the 

+

619 name that the user would have to write in the manifest for this attribute. 

+

620 

+

621 >>> class SourceType(TypedDict): 

+

622 ... source: List[FileSystemMatchRule] 

+

623 ... # Use "as" in the manifest because "as_" was not pretty enough 

+

624 ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.manifest_attribute("as")] 

+

625 

+

626 In this example, we use the parse hint to use "as" as the name in the manifest, because we cannot 

+

627 use "as" a valid python identifier (it is a keyword). While debputy would map `as_` to `as` for us, 

+

628 we have chosen to use `install_as` as a python identifier. 

+

629 """ 

+

630 return ManifestAttribute(attribute) 

+

631 

+

632 @classmethod 

+

633 def not_path_error_hint(cls) -> "DebputyParseHint": 

+

634 """Mark this attribute as not a "path hint" when it comes to reporting errors 

+

635 

+

636 By default, `debputy` will pick up attributes that uses path names (FileSystemMatchRule) as 

+

637 candidates for parse error hints (the little "<Search for: VALUE>" in error messages). 

+

638 

+

639 Most rules only have one active path-based attribute and paths tends to be unique enough 

+

640 that it helps people spot the issue faster. However, in rare cases, you can have multiple 

+

641 attributes that fit the bill. In this case, this hint can be used to "hide" the suboptimal 

+

642 choice. As an example: 

+

643 

+

644 >>> class SourceType(TypedDict): 

+

645 ... source: List[FileSystemMatchRule] 

+

646 ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.not_path_error_hint()] 

+

647 

+

648 In this case, without the hint, `debputy` might pick up `install_as` as the attribute to 

+

649 use as hint for error reporting. However, here we have decided that we never want `install_as` 

+

650 leaving `source` as the only option. 

+

651 

+

652 Generally, this type hint must be placed on the **source** format. Any source attribute matching 

+

653 the parsed format will be ignored. 

+

654 

+

655 Mind the asymmetry: The annotation is placed in the **source** format while `debputy` looks at 

+

656 the type of the target attribute to determine if it counts as path. 

+

657 """ 

+

658 return NOT_PATH_HINT 

+

659 

+

660 

+

661@dataclasses.dataclass(frozen=True, slots=True) 

+

662class TargetAttribute(DebputyParseHint): 

+

663 attribute: str 

+

664 

+

665 

+

666@dataclasses.dataclass(frozen=True, slots=True) 

+

667class ConflictWithSourceAttribute(DebputyParseHint): 

+

668 conflicting_attributes: FrozenSet[str] 

+

669 

+

670 

+

671@dataclasses.dataclass(frozen=True, slots=True) 

+

672class ConditionalRequired(DebputyParseHint): 

+

673 reason: str 

+

674 condition: Callable[["ParserContextData"], bool] 

+

675 

+

676 def condition_applies(self, context: "ParserContextData") -> bool: 

+

677 return self.condition(context) 

+

678 

+

679 

+

680@dataclasses.dataclass(frozen=True, slots=True) 

+

681class ManifestAttribute(DebputyParseHint): 

+

682 attribute: str 

+

683 

+

684 

+

685class NotPathHint(DebputyParseHint): 

+

686 pass 

+

687 

+

688 

+

689NOT_PATH_HINT = NotPathHint() 

+

690 

+

691 

+

692def _is_path_attribute_candidate( 

+

693 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

+

694) -> bool: 

+

695 if ( 

+

696 source_attribute.parse_hints 

+

697 and not source_attribute.parse_hints.applicable_as_path_hint 

+

698 ): 

+

699 return False 

+

700 target_type = target_attribute.attribute_type 

+

701 _, origin, args = unpack_type(target_type, False) 

+

702 match_type = target_type 

+

703 if origin == list: 

+

704 match_type = args[0] 

+

705 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

+

706 

+

707 

+

708class ParserGenerator: 

+

709 def __init__(self) -> None: 

+

710 self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} 

+

711 self._object_parsers: Dict[str, DispatchingObjectParser] = {} 

+

712 self._table_parsers: Dict[ 

+

713 Type[DebputyDispatchableType], DispatchingTableParser[Any] 

+

714 ] = {} 

+

715 self._in_package_context_parser: Dict[str, Any] = {} 

+

716 

+

717 def register_mapped_type(self, mapped_type: TypeMapping) -> None: 

+

718 existing = self._registered_types.get(mapped_type.target_type) 

+

719 if existing is not None: 719 ↛ 720line 719 didn't jump to line 720, because the condition on line 719 was never true

+

720 raise ValueError(f"The type {existing} is already registered") 

+

721 self._registered_types[mapped_type.target_type] = mapped_type 

+

722 

+

723 def discard_mapped_type(self, mapped_type: Type[T]) -> None: 

+

724 del self._registered_types[mapped_type] 

+

725 

+

726 def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None: 

+

727 assert rt not in self._table_parsers 

+

728 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

+

729 

+

730 def add_object_parser( 

+

731 self, 

+

732 path: str, 

+

733 *, 

+

734 parser_documentation: Optional[ParserDocumentation] = None, 

+

735 ) -> None: 

+

736 assert path not in self._in_package_context_parser 

+

737 assert path not in self._object_parsers 

+

738 self._object_parsers[path] = DispatchingObjectParser( 

+

739 path, parser_documentation=parser_documentation 

+

740 ) 

+

741 

+

742 def add_in_package_context_parser( 

+

743 self, 

+

744 path: str, 

+

745 delegate: DeclarativeInputParser[Any], 

+

746 ) -> None: 

+

747 assert path not in self._in_package_context_parser 

+

748 assert path not in self._object_parsers 

+

749 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

+

750 

+

751 @property 

+

752 def dispatchable_table_parsers( 

+

753 self, 

+

754 ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

+

755 return self._table_parsers 

+

756 

+

757 @property 

+

758 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

+

759 return self._object_parsers 

+

760 

+

761 def dispatch_parser_table_for( 

+

762 self, rule_type: TTP 

+

763 ) -> Optional[DispatchingTableParser[TP]]: 

+

764 return cast( 

+

765 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

+

766 ) 

+

767 

+

768 def generate_parser( 

+

769 self, 

+

770 parsed_content: Type[TD], 

+

771 *, 

+

772 source_content: Optional[SF] = None, 

+

773 allow_optional: bool = False, 

+

774 inline_reference_documentation: Optional[ParserDocumentation] = None, 

+

775 ) -> DeclarativeInputParser[TD]: 

+

776 """Derive a parser from a TypedDict 

+

777 

+

778 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

+

779 or two that are used as a description. 

+

780 

+

781 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

+

782 their types. As an example: 

+

783 

+

784 >>> class InstallDocsRule(DebputyParsedContent): 

+

785 ... sources: List[str] 

+

786 ... into: List[str] 

+

787 >>> pg = ParserGenerator() 

+

788 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

+

789 

+

790 This will create a parser that would be able to interpret something like: 

+

791 

+

792 ```yaml 

+

793 install-docs: 

+

794 sources: ["docs/*"] 

+

795 into: ["my-pkg"] 

+

796 ``` 

+

797 

+

798 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

+

799 you can also provide a TypedDict describing the input, enabling more flexibility: 

+

800 

+

801 >>> class InstallDocsRule(DebputyParsedContent): 

+

802 ... sources: List[str] 

+

803 ... into: List[str] 

+

804 >>> class InputDocsRuleInputFormat(TypedDict): 

+

805 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

+

806 ... sources: NotRequired[List[str]] 

+

807 ... into: Union[str, List[str]] 

+

808 >>> pg = ParserGenerator() 

+

809 >>> flexible_parser = pg.generate_parser( 

+

810 ... InstallDocsRule, 

+

811 ... source_content=InputDocsRuleInputFormat, 

+

812 ... ) 

+

813 

+

814 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

+

815 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

+

816 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

+

817 or a list of strings. Regardless of how the input was provided, the parser will normalize the input such that 

+

818 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

+

819 both the previous input but also the following input: 

+

820 

+

821 ```yaml 

+

822 install-docs: 

+

823 source: "docs/*" 

+

824 into: "my-pkg" 

+

825 ``` 

+

826 

+

827 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

+

828 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

+

829 while parsing. 

+

830 

+

831 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

+

832 as part of the input. Example: 

+

833 

+

834 >>> class DiscardRule(DebputyParsedContent): 

+

835 ... paths: List[str] 

+

836 >>> class DiscardRuleInputDictFormat(TypedDict): 

+

837 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

+

838 ... paths: NotRequired[List[str]] 

+

839 >>> # This format relies on DiscardRule having exactly one Required attribute 

+

840 >>> DiscardRuleInputWithAltFormat = Union[ 

+

841 ... DiscardRuleInputDictFormat, 

+

842 ... str, 

+

843 ... List[str], 

+

844 ... ] 

+

845 >>> pg = ParserGenerator() 

+

846 >>> flexible_parser = pg.generate_parser( 

+

847 ... DiscardRule, 

+

848 ... source_content=DiscardRuleInputWithAltFormat, 

+

849 ... ) 

+

850 

+

851 

+

852 Supported types: 

+

853 * `List` - must have a fixed type argument (such as `List[str]`) 

+

854 * `str` 

+

855 * `int` 

+

856 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

+

857 in the debian/control file. The code receives the BinaryPackage instance 

+

858 matching that input. 

+

859 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

+

860 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

+

861 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

+

862 available statically on all Debian systems (must be in `base-passwd`). 

+

863 The user has multiple options for how to specify it (either via name or id). 

+

864 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

+

865 available statically on all Debian systems (must be in `base-passwd`). 

+

866 The user has multiple options for how to specify it (either via name or id). 

+

867 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

+

868 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

+

869 provides the `debputy' default `when` parameter for conditionals. 

+

870 

+

871 Supported special type-like parameters: 

+

872 

+

873 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

+

874 outermost level. Cannot vary between `parsed_content` and `source_content`. 

+

875 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

+

876 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

+

877 Automapping (see below) is restricted to two members in the Union. 

+

878 

+

879 Notable non-supported types: 

+

880 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

+

881 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

+

882 

+

883 Automatic mapping rules from `source_content` to `parsed_content`: 

+

884 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

+

885 `lambda value: value if isinstance(value, list) else [value]` 

+

886 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

+

887 

+

888 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

+

889 for concrete features that may be useful to you. 

+

890 

+

891 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

+

892 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

+

893 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

+

894 `List[TypedDict[...]]`. 

+

895 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

+

896 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

+

897 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

+

898 Note you should never pass the parsed_content as source_content directly. 

+

899 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

+

900 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

+

901 can keep this False). 

+

902 :param inline_reference_documentation: Optionally, programmatic documentation 

+

903 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

+

904 """ 

+

905 orig_parsed_content = parsed_content 

+

906 if source_content is parsed_content: 906 ↛ 907line 906 didn't jump to line 907, because the condition on line 906 was never true

+

907 raise ValueError( 

+

908 "Do not provide source_content if it is the same as parsed_content" 

+

909 ) 

+

910 is_list_wrapped = False 

+

911 if get_origin(orig_parsed_content) == list: 

+

912 parsed_content = get_args(orig_parsed_content)[0] 

+

913 is_list_wrapped = True 

+

914 

+

915 if isinstance(parsed_content, type) and issubclass( 

+

916 parsed_content, DebputyDispatchableType 

+

917 ): 

+

918 parser = self.dispatch_parser_table_for(parsed_content) 

+

919 if parser is None: 919 ↛ 920line 919 didn't jump to line 920, because the condition on line 919 was never true

+

920 raise ValueError( 

+

921 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

+

922 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

+

923 ) 

+

924 # FIXME: Only the list wrapped version has documentation. 

+

925 if is_list_wrapped: 925 ↛ 930line 925 didn't jump to line 930, because the condition on line 925 was never false

+

926 parser = ListWrappedDeclarativeInputParser( 

+

927 parser, 

+

928 inline_reference_documentation=inline_reference_documentation, 

+

929 ) 

+

930 return parser 

+

931 

+

932 if not is_typeddict(parsed_content): 932 ↛ 933line 932 didn't jump to line 933, because the condition on line 932 was never true

+

933 raise ValueError( 

+

934 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

+

935 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

+

936 ) 

+

937 if is_list_wrapped: 

+

938 if get_origin(source_content) != list: 938 ↛ 939line 938 didn't jump to line 939, because the condition on line 938 was never true

+

939 raise ValueError( 

+

940 "If the parsed_content is a List type, then source_format must be a List type as well." 

+

941 ) 

+

942 source_content = get_args(source_content)[0] 

+

943 

+

944 target_attributes = self._parse_types( 

+

945 parsed_content, 

+

946 allow_source_attribute_annotations=source_content is None, 

+

947 forbid_optional=not allow_optional, 

+

948 ) 

+

949 required_target_parameters = frozenset(parsed_content.__required_keys__) 

+

950 parsed_alt_form = None 

+

951 non_mapping_source_only = False 

+

952 

+

953 if source_content is not None: 

+

954 default_target_attribute = None 

+

955 if len(required_target_parameters) == 1: 

+

956 default_target_attribute = next(iter(required_target_parameters)) 

+

957 

+

958 source_typed_dict, alt_source_forms = _extract_typed_dict( 

+

959 source_content, 

+

960 default_target_attribute, 

+

961 ) 

+

962 if alt_source_forms: 

+

963 parsed_alt_form = self._parse_alt_form( 

+

964 alt_source_forms, 

+

965 default_target_attribute, 

+

966 ) 

+

967 if source_typed_dict is not None: 

+

968 source_content_attributes = self._parse_types( 

+

969 source_typed_dict, 

+

970 allow_target_attribute_annotation=True, 

+

971 allow_source_attribute_annotations=True, 

+

972 forbid_optional=not allow_optional, 

+

973 ) 

+

974 source_content_parameter = "source_content" 

+

975 source_and_parsed_differs = True 

+

976 else: 

+

977 source_typed_dict = parsed_content 

+

978 source_content_attributes = target_attributes 

+

979 source_content_parameter = "parsed_content" 

+

980 source_and_parsed_differs = True 

+

981 non_mapping_source_only = True 

+

982 else: 

+

983 source_typed_dict = parsed_content 

+

984 source_content_attributes = target_attributes 

+

985 source_content_parameter = "parsed_content" 

+

986 source_and_parsed_differs = False 

+

987 

+

988 sources = collections.defaultdict(set) 

+

989 seen_targets = set() 

+

990 seen_source_names: Dict[str, str] = {} 

+

991 source_attributes: Dict[str, AttributeDescription] = {} 

+

992 path_hint_source_attributes = [] 

+

993 

+

994 for k in source_content_attributes: 

+

995 ia = source_content_attributes[k] 

+

996 

+

997 ta = ( 

+

998 target_attributes.get(ia.target_attribute) 

+

999 if source_and_parsed_differs 

+

1000 else ia 

+

1001 ) 

+

1002 if ta is None: 1002 ↛ 1004line 1002 didn't jump to line 1004, because the condition on line 1002 was never true

+

1003 # Error message would be wrong if this assertion is false. 

+

1004 assert source_and_parsed_differs 

+

1005 raise ValueError( 

+

1006 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

+

1007 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

+

1008 ) 

+

1009 if _is_path_attribute_candidate(ia, ta): 

+

1010 path_hint_source_attributes.append(ia.source_attribute_name) 

+

1011 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

+

1012 if existing_source_name: 1012 ↛ 1013line 1012 didn't jump to line 1013, because the condition on line 1012 was never true

+

1013 raise ValueError( 

+

1014 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

+

1015 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

+

1016 f' so only one attribute use "{ia.source_attribute_name}".' 

+

1017 ) 

+

1018 seen_source_names[ia.source_attribute_name] = k 

+

1019 seen_targets.add(ta.target_attribute) 

+

1020 sources[ia.target_attribute].add(k) 

+

1021 if source_and_parsed_differs: 

+

1022 bridge_mapper = self._type_normalize( 

+

1023 k, ia.attribute_type, ta.attribute_type, False 

+

1024 ) 

+

1025 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

+

1026 source_attributes[k] = ia 

+

1027 

+

1028 def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]: 

+

1029 return frozenset( 

+

1030 source_content_attributes[a].source_attribute_name for a in td_name 

+

1031 ) 

+

1032 

+

1033 _check_attributes( 

+

1034 parsed_content, 

+

1035 source_typed_dict, 

+

1036 source_content_attributes, 

+

1037 sources, 

+

1038 ) 

+

1039 

+

1040 at_least_one_of = frozenset( 

+

1041 _as_attr_names(g) 

+

1042 for k, g in sources.items() 

+

1043 if len(g) > 1 and k in required_target_parameters 

+

1044 ) 

+

1045 

+

1046 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 1046 ↛ 1047line 1046 didn't jump to line 1047, because the condition on line 1046 was never true

+

1047 missing = ", ".join( 

+

1048 repr(k) for k in (target_attributes.keys() - seen_targets) 

+

1049 ) 

+

1050 raise ValueError( 

+

1051 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

+

1052 f" {missing}" 

+

1053 ) 

+

1054 all_mutually_exclusive_fields = frozenset( 

+

1055 _as_attr_names(g) for g in sources.values() if len(g) > 1 

+

1056 ) 

+

1057 

+

1058 all_parameters = ( 

+

1059 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

+

1060 ) 

+

1061 _check_conflicts( 

+

1062 source_content_attributes, 

+

1063 source_typed_dict.__required_keys__, 

+

1064 all_parameters, 

+

1065 ) 

+

1066 

+

1067 manifest_attributes = { 

+

1068 a.source_attribute_name: a for a in source_content_attributes.values() 

+

1069 } 

+

1070 

+

1071 if parsed_alt_form is not None: 

+

1072 target_attribute = parsed_alt_form.target_attribute 

+

1073 if ( 1073 ↛ 1078line 1073 didn't jump to line 1078

+

1074 target_attribute not in required_target_parameters 

+

1075 and required_target_parameters 

+

1076 or len(required_target_parameters) > 1 

+

1077 ): 

+

1078 raise NotImplementedError( 

+

1079 "When using alternative source formats (Union[TypedDict, ...]), then the" 

+

1080 " target must have at most one require parameter" 

+

1081 ) 

+

1082 bridge_mapper = self._type_normalize( 

+

1083 target_attribute, 

+

1084 parsed_alt_form.attribute_type, 

+

1085 target_attributes[target_attribute].attribute_type, 

+

1086 False, 

+

1087 ) 

+

1088 parsed_alt_form.type_validator = ( 

+

1089 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

+

1090 ) 

+

1091 

+

1092 _verify_inline_reference_documentation( 

+

1093 source_content_attributes, 

+

1094 inline_reference_documentation, 

+

1095 parsed_alt_form is not None, 

+

1096 ) 

+

1097 if non_mapping_source_only: 

+

1098 parser = DeclarativeNonMappingInputParser( 

+

1099 assume_not_none(parsed_alt_form), 

+

1100 inline_reference_documentation=inline_reference_documentation, 

+

1101 ) 

+

1102 else: 

+

1103 parser = DeclarativeMappingInputParser( 

+

1104 _as_attr_names(source_typed_dict.__required_keys__), 

+

1105 _as_attr_names(all_parameters), 

+

1106 manifest_attributes, 

+

1107 source_attributes, 

+

1108 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

+

1109 alt_form_parser=parsed_alt_form, 

+

1110 at_least_one_of=at_least_one_of, 

+

1111 inline_reference_documentation=inline_reference_documentation, 

+

1112 path_hint_source_attributes=tuple(path_hint_source_attributes), 

+

1113 ) 

+

1114 if is_list_wrapped: 

+

1115 parser = ListWrappedDeclarativeInputParser(parser) 

+

1116 return parser 

+

1117 

+

1118 def _as_type_validator( 

+

1119 self, 

+

1120 attribute: str, 

+

1121 provided_type: Any, 

+

1122 parsing_typed_dict_attribute: bool, 

+

1123 ) -> AttributeTypeHandler: 

+

1124 assert not isinstance(provided_type, tuple) 

+

1125 

+

1126 if isinstance(provided_type, type) and issubclass( 

+

1127 provided_type, DebputyDispatchableType 

+

1128 ): 

+

1129 return _dispatch_parser(provided_type) 

+

1130 

+

1131 unmapped_type = self._strip_mapped_types( 

+

1132 provided_type, 

+

1133 parsing_typed_dict_attribute, 

+

1134 ) 

+

1135 type_normalizer = self._type_normalize( 

+

1136 attribute, 

+

1137 unmapped_type, 

+

1138 provided_type, 

+

1139 parsing_typed_dict_attribute, 

+

1140 ) 

+

1141 t_unmapped, t_orig, t_args = unpack_type( 

+

1142 unmapped_type, 

+

1143 parsing_typed_dict_attribute, 

+

1144 ) 

+

1145 

+

1146 if ( 1146 ↛ 1152line 1146 didn't jump to line 1152

+

1147 t_orig == Union 

+

1148 and t_args 

+

1149 and len(t_args) == 2 

+

1150 and any(v is _NONE_TYPE for v in t_args) 

+

1151 ): 

+

1152 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

+

1153 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

+

1154 validator = self._as_type_validator( 

+

1155 attribute, actual_type, parsing_typed_dict_attribute 

+

1156 ) 

+

1157 

+

1158 def _validator(v: Any, path: AttributePath) -> None: 

+

1159 if v is None: 

+

1160 return 

+

1161 validator.ensure_type(v, path) 

+

1162 

+

1163 return AttributeTypeHandler( 

+

1164 validator.describe_type(), 

+

1165 _validator, 

+

1166 base_type=validator.base_type, 

+

1167 mapper=type_normalizer, 

+

1168 ) 

+

1169 

+

1170 if unmapped_type in BASIC_SIMPLE_TYPES: 

+

1171 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

+

1172 

+

1173 type_mapping = self._registered_types.get(provided_type) 

+

1174 if type_mapping is not None: 

+

1175 simple_type = f" ({type_name})" 

+

1176 type_name = type_mapping.target_type.__name__ 

+

1177 else: 

+

1178 simple_type = "" 

+

1179 

+

1180 def _validator(v: Any, path: AttributePath) -> None: 

+

1181 if not isinstance(v, unmapped_type): 

+

1182 _validation_type_error( 

+

1183 path, f"The attribute must be a {type_name}{simple_type}" 

+

1184 ) 

+

1185 

+

1186 return AttributeTypeHandler( 

+

1187 type_name, 

+

1188 _validator, 

+

1189 base_type=unmapped_type, 

+

1190 mapper=type_normalizer, 

+

1191 ) 

+

1192 if t_orig == list: 

+

1193 if not t_args: 1193 ↛ 1194line 1193 didn't jump to line 1194, because the condition on line 1193 was never true

+

1194 raise ValueError( 

+

1195 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

+

1196 ) 

+

1197 _, t_provided_orig, t_provided_args = unpack_type( 

+

1198 provided_type, 

+

1199 parsing_typed_dict_attribute, 

+

1200 ) 

+

1201 genetic_type = t_args[0] 

+

1202 key_mapper = self._as_type_validator( 

+

1203 attribute, 

+

1204 genetic_type, 

+

1205 parsing_typed_dict_attribute, 

+

1206 ) 

+

1207 

+

1208 def _validator(v: Any, path: AttributePath) -> None: 

+

1209 if not isinstance(v, list): 1209 ↛ 1210line 1209 didn't jump to line 1210, because the condition on line 1209 was never true

+

1210 _validation_type_error(path, "The attribute must be a list") 

+

1211 for i, v in enumerate(v): 

+

1212 key_mapper.ensure_type(v, path[i]) 

+

1213 

+

1214 list_mapper = ( 

+

1215 map_each_element(key_mapper.mapper) 

+

1216 if key_mapper.mapper is not None 

+

1217 else None 

+

1218 ) 

+

1219 

+

1220 return AttributeTypeHandler( 

+

1221 f"List of {key_mapper.describe_type()}", 

+

1222 _validator, 

+

1223 base_type=list, 

+

1224 mapper=type_normalizer, 

+

1225 ).combine_mapper(list_mapper) 

+

1226 if is_typeddict(provided_type): 

+

1227 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

+

1228 return AttributeTypeHandler( 

+

1229 description=f"{provided_type.__name__} (Typed Mapping)", 

+

1230 ensure_type=lambda v, ap: None, 

+

1231 base_type=dict, 

+

1232 mapper=lambda v, ap, cv: subparser.parse_input( 

+

1233 v, ap, parser_context=cv 

+

1234 ), 

+

1235 ) 

+

1236 if t_orig == dict: 

+

1237 if not t_args or len(t_args) != 2: 1237 ↛ 1238line 1237 didn't jump to line 1238, because the condition on line 1237 was never true

+

1238 raise ValueError( 

+

1239 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

+

1240 ) 

+

1241 if t_args[0] != str: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true

+

1242 raise ValueError( 

+

1243 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

+

1244 " Currently, only `str` is supported (Dict[str, Y])" 

+

1245 ) 

+

1246 key_mapper = self._as_type_validator( 

+

1247 attribute, 

+

1248 t_args[0], 

+

1249 parsing_typed_dict_attribute, 

+

1250 ) 

+

1251 value_mapper = self._as_type_validator( 

+

1252 attribute, 

+

1253 t_args[1], 

+

1254 parsing_typed_dict_attribute, 

+

1255 ) 

+

1256 

+

1257 if key_mapper.base_type is None: 1257 ↛ 1258line 1257 didn't jump to line 1258, because the condition on line 1257 was never true

+

1258 raise ValueError( 

+

1259 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

+

1260 f" without trivial base types (such as `str`) are not supported at the moment." 

+

1261 ) 

+

1262 

+

1263 if value_mapper.mapper is not None: 1263 ↛ 1264line 1263 didn't jump to line 1264, because the condition on line 1263 was never true

+

1264 raise ValueError( 

+

1265 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

+

1266 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

+

1267 " Better typing may come later" 

+

1268 ) 

+

1269 

+

1270 def _validator(uv: Any, path: AttributePath) -> None: 

+

1271 if not isinstance(uv, dict): 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true

+

1272 _validation_type_error(path, "The attribute must be a mapping") 

+

1273 key_name = "the first key in the mapping" 

+

1274 for i, (k, v) in enumerate(uv.items()): 

+

1275 if not key_mapper.base_type_match(k): 1275 ↛ 1276line 1275 didn't jump to line 1276, because the condition on line 1275 was never true

+

1276 kp = path.copy_with_path_hint(key_name) 

+

1277 _validation_type_error( 

+

1278 kp, 

+

1279 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

+

1280 ) 

+

1281 key_name = f"the key after {k}" 

+

1282 value_mapper.ensure_type(v, path[k]) 

+

1283 

+

1284 return AttributeTypeHandler( 

+

1285 f"Mapping of {value_mapper.describe_type()}", 

+

1286 _validator, 

+

1287 base_type=dict, 

+

1288 mapper=type_normalizer, 

+

1289 ).combine_mapper(key_mapper.mapper) 

+

1290 if t_orig == Union: 

+

1291 if _is_two_arg_x_list_x(t_args): 

+

1292 # Force the order to be "X, List[X]" as it simplifies the code 

+

1293 x_list_x = ( 

+

1294 t_args if get_origin(t_args[1]) == list else (t_args[1], t_args[0]) 

+

1295 ) 

+

1296 

+

1297 # X, List[X] could match if X was List[Y]. However, our code below assumes 

+

1298 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

+

1299 # case to avoid this assert and fall into the "generic case". 

+

1300 assert get_origin(x_list_x[0]) != list 

+

1301 x_subtype_checker = self._as_type_validator( 

+

1302 attribute, 

+

1303 x_list_x[0], 

+

1304 parsing_typed_dict_attribute, 

+

1305 ) 

+

1306 list_x_subtype_checker = self._as_type_validator( 

+

1307 attribute, 

+

1308 x_list_x[1], 

+

1309 parsing_typed_dict_attribute, 

+

1310 ) 

+

1311 type_description = x_subtype_checker.describe_type() 

+

1312 type_description = f"{type_description} or a list of {type_description}" 

+

1313 

+

1314 def _validator(v: Any, path: AttributePath) -> None: 

+

1315 if isinstance(v, list): 

+

1316 list_x_subtype_checker.ensure_type(v, path) 

+

1317 else: 

+

1318 x_subtype_checker.ensure_type(v, path) 

+

1319 

+

1320 return AttributeTypeHandler( 

+

1321 type_description, 

+

1322 _validator, 

+

1323 mapper=type_normalizer, 

+

1324 ) 

+

1325 else: 

+

1326 subtype_checker = [ 

+

1327 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

+

1328 for a in t_args 

+

1329 ] 

+

1330 type_description = "one-of: " + ", ".join( 

+

1331 f"{sc.describe_type()}" for sc in subtype_checker 

+

1332 ) 

+

1333 mapper = subtype_checker[0].mapper 

+

1334 if any(mapper != sc.mapper for sc in subtype_checker): 1334 ↛ 1335line 1334 didn't jump to line 1335, because the condition on line 1334 was never true

+

1335 raise ValueError( 

+

1336 f'Cannot handle the union "{provided_type}" as the target types need different' 

+

1337 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

+

1338 " where X is a non-collection type." 

+

1339 ) 

+

1340 

+

1341 def _validator(v: Any, path: AttributePath) -> None: 

+

1342 partial_matches = [] 

+

1343 for sc in subtype_checker: 1343 ↛ 1351line 1343 didn't jump to line 1351, because the loop on line 1343 didn't complete

+

1344 try: 

+

1345 sc.ensure_type(v, path) 

+

1346 return 

+

1347 except ManifestParseException as e: 

+

1348 if sc.base_type_match(v): 1348 ↛ 1349line 1348 didn't jump to line 1349, because the condition on line 1348 was never true

+

1349 partial_matches.append((sc, e)) 

+

1350 

+

1351 if len(partial_matches) == 1: 

+

1352 raise partial_matches[0][1] 

+

1353 _validation_type_error( 

+

1354 path, f"Could not match against: {type_description}" 

+

1355 ) 

+

1356 

+

1357 return AttributeTypeHandler( 

+

1358 type_description, 

+

1359 _validator, 

+

1360 mapper=type_normalizer, 

+

1361 ) 

+

1362 if t_orig == Literal: 

+

1363 # We want "x" for string values; repr provides 'x' 

+

1364 pretty = ", ".join( 

+

1365 f'"{v}"' if isinstance(v, str) else str(v) for v in t_args 

+

1366 ) 

+

1367 

+

1368 def _validator(v: Any, path: AttributePath) -> None: 

+

1369 if v not in t_args: 

+

1370 value_hint = "" 

+

1371 if isinstance(v, str): 1371 ↛ 1373line 1371 didn't jump to line 1373, because the condition on line 1371 was never false

+

1372 value_hint = f"({v}) " 

+

1373 _validation_type_error( 

+

1374 path, 

+

1375 f"Value {value_hint}must be one of the following literal values: {pretty}", 

+

1376 ) 

+

1377 

+

1378 return AttributeTypeHandler( 

+

1379 f"One of the following literal values: {pretty}", 

+

1380 _validator, 

+

1381 ) 

+

1382 

+

1383 if provided_type == Any: 1383 ↛ 1388line 1383 didn't jump to line 1388, because the condition on line 1383 was never false

+

1384 return AttributeTypeHandler( 

+

1385 "any (unvalidated)", 

+

1386 lambda *a: None, 

+

1387 ) 

+

1388 raise ValueError( 

+

1389 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

+

1390 ) 

+

1391 

+

1392 def _parse_types( 

+

1393 self, 

+

1394 spec: Type[TypedDict], 

+

1395 allow_target_attribute_annotation: bool = False, 

+

1396 allow_source_attribute_annotations: bool = False, 

+

1397 forbid_optional: bool = True, 

+

1398 ) -> Dict[str, AttributeDescription]: 

+

1399 annotations = get_type_hints(spec, include_extras=True) 

+

1400 return { 

+

1401 k: self._attribute_description( 

+

1402 k, 

+

1403 t, 

+

1404 k in spec.__required_keys__, 

+

1405 allow_target_attribute_annotation=allow_target_attribute_annotation, 

+

1406 allow_source_attribute_annotations=allow_source_attribute_annotations, 

+

1407 forbid_optional=forbid_optional, 

+

1408 ) 

+

1409 for k, t in annotations.items() 

+

1410 } 

+

1411 

+

1412 def _attribute_description( 

+

1413 self, 

+

1414 attribute: str, 

+

1415 orig_td: Any, 

+

1416 is_required: bool, 

+

1417 forbid_optional: bool = True, 

+

1418 allow_target_attribute_annotation: bool = False, 

+

1419 allow_source_attribute_annotations: bool = False, 

+

1420 ) -> AttributeDescription: 

+

1421 td, anno, is_optional = _parse_type( 

+

1422 attribute, orig_td, forbid_optional=forbid_optional 

+

1423 ) 

+

1424 type_validator = self._as_type_validator(attribute, td, True) 

+

1425 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

+

1426 anno, 

+

1427 f' Seen with attribute "{attribute}".', 

+

1428 attribute, 

+

1429 is_required, 

+

1430 allow_target_attribute_annotation=allow_target_attribute_annotation, 

+

1431 allow_source_attribute_annotations=allow_source_attribute_annotations, 

+

1432 ) 

+

1433 return AttributeDescription( 

+

1434 target_attribute=parsed_annotations.target_attribute, 

+

1435 attribute_type=td, 

+

1436 type_validator=type_validator, 

+

1437 annotations=anno, 

+

1438 is_optional=is_optional, 

+

1439 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

+

1440 conditional_required=parsed_annotations.conditional_required, 

+

1441 source_attribute_name=assume_not_none( 

+

1442 parsed_annotations.source_manifest_attribute 

+

1443 ), 

+

1444 parse_hints=parsed_annotations, 

+

1445 ) 

+

1446 

+

1447 def _parse_alt_form( 

+

1448 self, 

+

1449 alt_form, 

+

1450 default_target_attribute: Optional[str], 

+

1451 ) -> AttributeDescription: 

+

1452 td, anno, is_optional = _parse_type( 

+

1453 "source_format alternative form", 

+

1454 alt_form, 

+

1455 forbid_optional=True, 

+

1456 parsing_typed_dict_attribute=False, 

+

1457 ) 

+

1458 type_validator = self._as_type_validator( 

+

1459 "source_format alternative form", 

+

1460 td, 

+

1461 True, 

+

1462 ) 

+

1463 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

+

1464 anno, 

+

1465 " The alternative for source_format.", 

+

1466 None, 

+

1467 False, 

+

1468 default_target_attribute=default_target_attribute, 

+

1469 allow_target_attribute_annotation=True, 

+

1470 allow_source_attribute_annotations=False, 

+

1471 ) 

+

1472 return AttributeDescription( 

+

1473 target_attribute=parsed_annotations.target_attribute, 

+

1474 attribute_type=td, 

+

1475 type_validator=type_validator, 

+

1476 annotations=anno, 

+

1477 is_optional=is_optional, 

+

1478 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

+

1479 conditional_required=parsed_annotations.conditional_required, 

+

1480 source_attribute_name="Alt form of the source_format", 

+

1481 ) 

+

1482 

+

1483 def _union_narrowing( 

+

1484 self, 

+

1485 input_type: Any, 

+

1486 target_type: Any, 

+

1487 parsing_typed_dict_attribute: bool, 

+

1488 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

+

1489 _, input_orig, input_args = unpack_type( 

+

1490 input_type, parsing_typed_dict_attribute 

+

1491 ) 

+

1492 _, target_orig, target_args = unpack_type( 

+

1493 target_type, parsing_typed_dict_attribute 

+

1494 ) 

+

1495 

+

1496 if input_orig != Union or not input_args: 1496 ↛ 1497line 1496 didn't jump to line 1497, because the condition on line 1496 was never true

+

1497 raise ValueError("input_type must be a Union[...] with non-empty args") 

+

1498 

+

1499 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

+

1500 # - Where X = Y or there is a simple standard transformation from X to Y. 

+

1501 

+

1502 if target_orig not in (Union, list) or not target_args: 

+

1503 # Not supported 

+

1504 return None 

+

1505 

+

1506 if target_orig == Union and set(input_args) == set(target_args): 1506 ↛ 1508line 1506 didn't jump to line 1508, because the condition on line 1506 was never true

+

1507 # Not needed (identity mapping) 

+

1508 return None 

+

1509 

+

1510 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1510 ↛ exit,   1510 ↛ 15122 missed branches: 1) line 1510 didn't finish the generator expression on line 1510, 2) line 1510 didn't jump to line 1512, because the condition on line 1510 was never true

+

1511 # Not supported 

+

1512 return None 

+

1513 

+

1514 target_arg = target_args[0] 

+

1515 simplified_type = self._strip_mapped_types( 

+

1516 target_arg, parsing_typed_dict_attribute 

+

1517 ) 

+

1518 acceptable_types = { 

+

1519 target_arg, 

+

1520 List[target_arg], # type: ignore 

+

1521 simplified_type, 

+

1522 List[simplified_type], # type: ignore 

+

1523 } 

+

1524 target_format = ( 

+

1525 target_arg, 

+

1526 List[target_arg], # type: ignore 

+

1527 ) 

+

1528 in_target_format = 0 

+

1529 in_simple_format = 0 

+

1530 for input_arg in input_args: 

+

1531 if input_arg not in acceptable_types: 1531 ↛ 1533line 1531 didn't jump to line 1533, because the condition on line 1531 was never true

+

1532 # Not supported 

+

1533 return None 

+

1534 if input_arg in target_format: 

+

1535 in_target_format += 1 

+

1536 else: 

+

1537 in_simple_format += 1 

+

1538 

+

1539 assert in_simple_format or in_target_format 

+

1540 

+

1541 if in_target_format and not in_simple_format: 

+

1542 # Union[X, List[X]] -> List[X] 

+

1543 return normalize_into_list 

+

1544 mapped = self._registered_types[target_arg] 

+

1545 if not in_target_format and in_simple_format: 1545 ↛ 1560line 1545 didn't jump to line 1560, because the condition on line 1545 was never false

+

1546 # Union[X, List[X]] -> List[Y] 

+

1547 

+

1548 def _mapper_x_list_y( 

+

1549 x: Union[Any, List[Any]], 

+

1550 ap: AttributePath, 

+

1551 pc: Optional["ParserContextData"], 

+

1552 ) -> List[Any]: 

+

1553 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

+

1554 

+

1555 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

+

1556 

+

1557 return _mapper_x_list_y 

+

1558 

+

1559 # Union[Y, List[X]] -> List[Y] 

+

1560 if not isinstance(target_arg, type): 

+

1561 raise ValueError( 

+

1562 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

+

1563 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

+

1564 f" in the source content (but both a mix of both)" 

+

1565 ) 

+

1566 

+

1567 def _mapper_mixed_list_y( 

+

1568 x: Union[Any, List[Any]], 

+

1569 ap: AttributePath, 

+

1570 pc: Optional["ParserContextData"], 

+

1571 ) -> List[Any]: 

+

1572 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

+

1573 

+

1574 return [ 

+

1575 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

+

1576 for x in in_list_form 

+

1577 ] 

+

1578 

+

1579 return _mapper_mixed_list_y 

+

1580 

+

1581 def _type_normalize( 

+

1582 self, 

+

1583 attribute: str, 

+

1584 input_type: Any, 

+

1585 target_type: Any, 

+

1586 parsing_typed_dict_attribute: bool, 

+

1587 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

+

1588 if input_type == target_type: 

+

1589 return None 

+

1590 _, input_orig, input_args = unpack_type( 

+

1591 input_type, parsing_typed_dict_attribute 

+

1592 ) 

+

1593 _, target_orig, target_args = unpack_type( 

+

1594 target_type, 

+

1595 parsing_typed_dict_attribute, 

+

1596 ) 

+

1597 if input_orig == Union: 

+

1598 result = self._union_narrowing( 

+

1599 input_type, target_type, parsing_typed_dict_attribute 

+

1600 ) 

+

1601 if result: 

+

1602 return result 

+

1603 elif target_orig == list and target_args[0] == input_type: 

+

1604 return wrap_into_list 

+

1605 

+

1606 mapped = self._registered_types.get(target_type) 

+

1607 if mapped is not None and input_type == mapped.source_type: 

+

1608 # Source -> Target 

+

1609 return mapped.mapper 

+

1610 if target_orig == list and target_args: 1610 ↛ 1628line 1610 didn't jump to line 1628, because the condition on line 1610 was never false

+

1611 mapped = self._registered_types.get(target_args[0]) 

+

1612 if mapped is not None: 1612 ↛ 1628line 1612 didn't jump to line 1628, because the condition on line 1612 was never false

+

1613 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

+

1614 mapped_type: TypeMapping = mapped 

+

1615 if input_type == mapped.source_type: 1615 ↛ 1617line 1615 didn't jump to line 1617, because the condition on line 1615 was never true

+

1616 # Source -> List[Target] 

+

1617 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

+

1618 if ( 1618 ↛ 1628line 1618 didn't jump to line 1628

+

1619 input_orig == list 

+

1620 and input_args 

+

1621 and input_args[0] == mapped_type.source_type 

+

1622 ): 

+

1623 # List[Source] -> List[Target] 

+

1624 return lambda xs, ap, pc: [ 

+

1625 mapped_type.mapper(x, ap, pc) for x in xs 

+

1626 ] 

+

1627 

+

1628 raise ValueError( 

+

1629 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

+

1630 f" {input_type} to {target_type}" 

+

1631 ) 

+

1632 

+

1633 def _strip_mapped_types( 

+

1634 self, orig_td: Any, parsing_typed_dict_attribute: bool 

+

1635 ) -> Any: 

+

1636 m = self._registered_types.get(orig_td) 

+

1637 if m is not None: 

+

1638 return m.source_type 

+

1639 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

+

1640 if v == list: 

+

1641 arg = args[0] 

+

1642 m = self._registered_types.get(arg) 

+

1643 if m: 

+

1644 return List[m.source_type] # type: ignore 

+

1645 if v == Union: 

+

1646 stripped_args = tuple( 

+

1647 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

+

1648 ) 

+

1649 if stripped_args != args: 

+

1650 return Union[stripped_args] 

+

1651 return orig_td 

+

1652 

+

1653 

+

1654def _verify_inline_reference_documentation( 

+

1655 source_content_attributes: Mapping[str, AttributeDescription], 

+

1656 inline_reference_documentation: Optional[ParserDocumentation], 

+

1657 has_alt_form: bool, 

+

1658) -> None: 

+

1659 if inline_reference_documentation is None: 

+

1660 return 

+

1661 attribute_doc = inline_reference_documentation.attribute_doc 

+

1662 if attribute_doc: 

+

1663 seen = set() 

+

1664 for attr_doc in attribute_doc: 

+

1665 for attr_name in attr_doc.attributes: 

+

1666 attr = source_content_attributes.get(attr_name) 

+

1667 if attr is None: 1667 ↛ 1668line 1667 didn't jump to line 1668, because the condition on line 1667 was never true

+

1668 raise ValueError( 

+

1669 f'The inline_reference_documentation references an attribute "{attr_name}", which does not' 

+

1670 f" exist in the source format." 

+

1671 ) 

+

1672 if attr_name in seen: 1672 ↛ 1673line 1672 didn't jump to line 1673, because the condition on line 1672 was never true

+

1673 raise ValueError( 

+

1674 f'The inline_reference_documentation has documentation for "{attr_name}" twice,' 

+

1675 f" which is not supported. Please document it at most once" 

+

1676 ) 

+

1677 seen.add(attr_name) 

+

1678 

+

1679 undocumented = source_content_attributes.keys() - seen 

+

1680 if undocumented: 1680 ↛ 1681line 1680 didn't jump to line 1681, because the condition on line 1680 was never true

+

1681 undocumented_attrs = ", ".join(undocumented) 

+

1682 raise ValueError( 

+

1683 "The following attributes were not documented. If this is deliberate, then please" 

+

1684 ' declare each them as undocumented (via undocumented_attr("foo")):' 

+

1685 f" {undocumented_attrs}" 

+

1686 ) 

+

1687 

+

1688 if inline_reference_documentation.alt_parser_description and not has_alt_form: 1688 ↛ 1689line 1688 didn't jump to line 1689, because the condition on line 1688 was never true

+

1689 raise ValueError( 

+

1690 "The inline_reference_documentation had documentation for an non-mapping format," 

+

1691 " but the source format does not have a non-mapping format." 

+

1692 ) 

+

1693 

+

1694 

+

1695def _check_conflicts( 

+

1696 input_content_attributes: Dict[str, AttributeDescription], 

+

1697 required_attributes: FrozenSet[str], 

+

1698 all_attributes: FrozenSet[str], 

+

1699) -> None: 

+

1700 for attr_name, attr in input_content_attributes.items(): 

+

1701 if attr_name in required_attributes and attr.conflicting_attributes: 1701 ↛ 1702line 1701 didn't jump to line 1702, because the condition on line 1701 was never true

+

1702 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

+

1703 raise ValueError( 

+

1704 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

+

1705 " This makes it impossible to use these attributes. Either remove the attributes" 

+

1706 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

+

1707 " optional (NotRequired)" 

+

1708 ) 

+

1709 else: 

+

1710 required_conflicts = attr.conflicting_attributes & required_attributes 

+

1711 if required_conflicts: 1711 ↛ 1712line 1711 didn't jump to line 1712, because the condition on line 1711 was never true

+

1712 c = ", ".join(repr(a) for a in required_conflicts) 

+

1713 raise ValueError( 

+

1714 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

+

1715 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

+

1716 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

+

1717 ) 

+

1718 unknown_attributes = attr.conflicting_attributes - all_attributes 

+

1719 if unknown_attributes: 1719 ↛ 1720line 1719 didn't jump to line 1720, because the condition on line 1719 was never true

+

1720 c = ", ".join(repr(a) for a in unknown_attributes) 

+

1721 raise ValueError( 

+

1722 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

+

1723 f" None of these attributes were declared in the input." 

+

1724 ) 

+

1725 

+

1726 

+

1727def _check_attributes( 

+

1728 content: Type[TypedDict], 

+

1729 input_content: Type[TypedDict], 

+

1730 input_content_attributes: Dict[str, AttributeDescription], 

+

1731 sources: Mapping[str, Collection[str]], 

+

1732) -> None: 

+

1733 target_required_keys = content.__required_keys__ 

+

1734 input_required_keys = input_content.__required_keys__ 

+

1735 all_input_keys = input_required_keys | input_content.__optional_keys__ 

+

1736 

+

1737 for input_name in all_input_keys: 

+

1738 attr = input_content_attributes[input_name] 

+

1739 target_name = attr.target_attribute 

+

1740 source_names = sources[target_name] 

+

1741 input_is_required = input_name in input_required_keys 

+

1742 target_is_required = target_name in target_required_keys 

+

1743 

+

1744 assert source_names 

+

1745 

+

1746 if input_is_required and len(source_names) > 1: 1746 ↛ 1747line 1746 didn't jump to line 1747, because the condition on line 1746 was never true

+

1747 raise ValueError( 

+

1748 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

+

1749 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

+

1750 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

+

1751 f' "{input_name}" might be missing a NotRequired type' 

+

1752 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

+

1753 ) 

+

1754 if not input_is_required and target_is_required and len(source_names) == 1: 1754 ↛ 1755line 1754 didn't jump to line 1755, because the condition on line 1754 was never true

+

1755 raise ValueError( 

+

1756 f'The source attribute "{input_name}" is not marked as required and maps to' 

+

1757 f' "{target_name}", which is marked as required. As there are no other attributes' 

+

1758 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

+

1759 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

+

1760 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

+

1761 ) 

+

1762 

+

1763 

+

1764def _validation_type_error(path: AttributePath, message: str) -> None: 

+

1765 raise ManifestParseException( 

+

1766 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

+

1767 ) 

+

1768 

+

1769 

+

1770def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool: 

+

1771 if len(t_args) != 2: 1771 ↛ 1772line 1771 didn't jump to line 1772, because the condition on line 1771 was never true

+

1772 return False 

+

1773 lhs, rhs = t_args 

+

1774 if get_origin(lhs) == list: 

+

1775 if get_origin(rhs) == list: 1775 ↛ 1778line 1775 didn't jump to line 1778, because the condition on line 1775 was never true

+

1776 # It could still match X, List[X] - but we do not allow this case for now as the caller 

+

1777 # does not support it. 

+

1778 return False 

+

1779 l_args = get_args(lhs) 

+

1780 return bool(l_args and l_args[0] == rhs) 

+

1781 if get_origin(rhs) == list: 

+

1782 r_args = get_args(rhs) 

+

1783 return bool(r_args and r_args[0] == lhs) 

+

1784 return False 

+

1785 

+

1786 

+

1787def _extract_typed_dict( 

+

1788 base_type, 

+

1789 default_target_attribute: Optional[str], 

+

1790) -> Tuple[Optional[Type[TypedDict]], Any]: 

+

1791 if is_typeddict(base_type): 

+

1792 return base_type, None 

+

1793 _, origin, args = unpack_type(base_type, False) 

+

1794 if origin != Union: 

+

1795 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1795 ↛ 1796line 1795 didn't jump to line 1796, because the condition on line 1795 was never true

+

1796 raise ValueError( 

+

1797 "The source_format cannot be nor contain a (non-TypedDict) dict" 

+

1798 ) 

+

1799 return None, base_type 

+

1800 typed_dicts = [x for x in args if is_typeddict(x)] 

+

1801 if len(typed_dicts) > 1: 1801 ↛ 1802line 1801 didn't jump to line 1802, because the condition on line 1801 was never true

+

1802 raise ValueError( 

+

1803 "When source_format is a Union, it must contain at most one TypedDict" 

+

1804 ) 

+

1805 typed_dict = typed_dicts[0] if typed_dicts else None 

+

1806 

+

1807 if any(x is None or x is _NONE_TYPE for x in args): 1807 ↛ 1808line 1807 didn't jump to line 1808, because the condition on line 1807 was never true

+

1808 raise ValueError( 

+

1809 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

+

1810 ) 

+

1811 

+

1812 if any( 1812 ↛ 1817line 1812 didn't jump to line 1817, because the condition on line 1812 was never true

+

1813 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

+

1814 for x in args 

+

1815 if x is not typed_dict 

+

1816 ): 

+

1817 raise ValueError( 

+

1818 "The source_format cannot be nor contain a (non-TypedDict) dict" 

+

1819 ) 

+

1820 remaining = [x for x in args if x is not typed_dict] 

+

1821 has_target_attribute = False 

+

1822 anno = None 

+

1823 if len(remaining) == 1: 1823 ↛ 1824line 1823 didn't jump to line 1824, because the condition on line 1823 was never true

+

1824 base_type, anno, _ = _parse_type( 

+

1825 "source_format alternative form", 

+

1826 remaining[0], 

+

1827 forbid_optional=True, 

+

1828 parsing_typed_dict_attribute=False, 

+

1829 ) 

+

1830 has_target_attribute = bool(anno) and any( 

+

1831 isinstance(x, TargetAttribute) for x in anno 

+

1832 ) 

+

1833 target_type = base_type 

+

1834 else: 

+

1835 target_type = Union[tuple(remaining)] 

+

1836 

+

1837 if default_target_attribute is None and not has_target_attribute: 1837 ↛ 1838line 1837 didn't jump to line 1838, because the condition on line 1837 was never true

+

1838 raise ValueError( 

+

1839 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

+

1840 " OR the parsed_content format must have exactly one attribute that is required." 

+

1841 ) 

+

1842 if anno: 1842 ↛ 1843line 1842 didn't jump to line 1843, because the condition on line 1842 was never true

+

1843 final_anno = [target_type] 

+

1844 final_anno.extend(anno) 

+

1845 return typed_dict, Annotated[tuple(final_anno)] 

+

1846 return typed_dict, target_type 

+

1847 

+

1848 

+

1849def _dispatch_parse_generator( 

+

1850 dispatch_type: Type[DebputyDispatchableType], 

+

1851) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

+

1852 def _dispatch_parse( 

+

1853 value: Any, 

+

1854 attribute_path: AttributePath, 

+

1855 parser_context: Optional["ParserContextData"], 

+

1856 ): 

+

1857 assert parser_context is not None 

+

1858 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

+

1859 return dispatching_parser.parse_input( 

+

1860 value, attribute_path, parser_context=parser_context 

+

1861 ) 

+

1862 

+

1863 return _dispatch_parse 

+

1864 

+

1865 

+

1866def _dispatch_parser( 

+

1867 dispatch_type: Type[DebputyDispatchableType], 

+

1868) -> AttributeTypeHandler: 

+

1869 return AttributeTypeHandler( 

+

1870 dispatch_type.__name__, 

+

1871 lambda *a: None, 

+

1872 mapper=_dispatch_parse_generator(dispatch_type), 

+

1873 ) 

+

1874 

+

1875 

+

1876def _parse_type( 

+

1877 attribute: str, 

+

1878 orig_td: Any, 

+

1879 forbid_optional: bool = True, 

+

1880 parsing_typed_dict_attribute: bool = True, 

+

1881) -> Tuple[Any, Tuple[Any, ...], bool]: 

+

1882 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

+

1883 md: Tuple[Any, ...] = tuple() 

+

1884 optional = False 

+

1885 if v is not None: 

+

1886 if v == Annotated: 

+

1887 anno = get_args(td) 

+

1888 md = anno[1:] 

+

1889 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

+

1890 

+

1891 if td is _NONE_TYPE: 1891 ↛ 1892line 1891 didn't jump to line 1892, because the condition on line 1891 was never true

+

1892 raise ValueError( 

+

1893 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

+

1894 " debputy manifest, so this attribute does not make sense in its current form." 

+

1895 ) 

+

1896 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1896 ↛ 1897line 1896 didn't jump to line 1897, because the condition on line 1896 was never true

+

1897 raise ValueError( 

+

1898 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

+

1899 " Please use NotRequired for optional fields" 

+

1900 ) 

+

1901 

+

1902 return td, md, optional 

+

1903 

+

1904 

+

1905def _normalize_attribute_name(attribute: str) -> str: 

+

1906 if attribute.endswith("_"): 

+

1907 attribute = attribute[:-1] 

+

1908 return attribute.replace("_", "-") 

+

1909 

+

1910 

+

1911@dataclasses.dataclass 

+

1912class DetectedDebputyParseHint: 

+

1913 target_attribute: str 

+

1914 source_manifest_attribute: Optional[str] 

+

1915 conflict_with_source_attributes: FrozenSet[str] 

+

1916 conditional_required: Optional[ConditionalRequired] 

+

1917 applicable_as_path_hint: bool 

+

1918 

+

1919 @classmethod 

+

1920 def parse_annotations( 

+

1921 cls, 

+

1922 anno: Tuple[Any, ...], 

+

1923 error_context: str, 

+

1924 default_attribute_name: Optional[str], 

+

1925 is_required: bool, 

+

1926 default_target_attribute: Optional[str] = None, 

+

1927 allow_target_attribute_annotation: bool = False, 

+

1928 allow_source_attribute_annotations: bool = False, 

+

1929 ) -> "DetectedDebputyParseHint": 

+

1930 target_attr_anno = find_annotation(anno, TargetAttribute) 

+

1931 if target_attr_anno: 

+

1932 if not allow_target_attribute_annotation: 1932 ↛ 1933line 1932 didn't jump to line 1933, because the condition on line 1932 was never true

+

1933 raise ValueError( 

+

1934 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

+

1935 ) 

+

1936 target_attribute = target_attr_anno.attribute 

+

1937 elif default_target_attribute is not None: 

+

1938 target_attribute = default_target_attribute 

+

1939 elif default_attribute_name is not None: 1939 ↛ 1942line 1939 didn't jump to line 1942, because the condition on line 1939 was never false

+

1940 target_attribute = default_attribute_name 

+

1941 else: 

+

1942 if default_attribute_name is None: 

+

1943 raise ValueError( 

+

1944 "allow_target_attribute_annotation must be True OR " 

+

1945 "default_attribute_name/default_target_attribute must be not None" 

+

1946 ) 

+

1947 raise ValueError( 

+

1948 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

+

1949 ) 

+

1950 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

+

1951 _source_attribute_allowed( 

+

1952 allow_source_attribute_annotations, error_context, source_attribute_anno 

+

1953 ) 

+

1954 if source_attribute_anno: 

+

1955 source_attribute_name = source_attribute_anno.attribute 

+

1956 elif default_attribute_name is not None: 

+

1957 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

+

1958 else: 

+

1959 source_attribute_name = None 

+

1960 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

+

1961 if mutual_exclusive_with_anno: 

+

1962 _source_attribute_allowed( 

+

1963 allow_source_attribute_annotations, 

+

1964 error_context, 

+

1965 mutual_exclusive_with_anno, 

+

1966 ) 

+

1967 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

+

1968 else: 

+

1969 conflicting_attributes = frozenset() 

+

1970 conditional_required = find_annotation(anno, ConditionalRequired) 

+

1971 

+

1972 if conditional_required and is_required: 1972 ↛ 1973line 1972 didn't jump to line 1973, because the condition on line 1972 was never true

+

1973 if default_attribute_name is None: 

+

1974 raise ValueError( 

+

1975 f"is_required cannot be True without default_attribute_name being not None" 

+

1976 ) 

+

1977 raise ValueError( 

+

1978 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

+

1979 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

+

1980 ) 

+

1981 

+

1982 not_path_hint_anno = find_annotation(anno, NotPathHint) 

+

1983 applicable_as_path_hint = not_path_hint_anno is None 

+

1984 

+

1985 return DetectedDebputyParseHint( 

+

1986 target_attribute=target_attribute, 

+

1987 source_manifest_attribute=source_attribute_name, 

+

1988 conflict_with_source_attributes=conflicting_attributes, 

+

1989 conditional_required=conditional_required, 

+

1990 applicable_as_path_hint=applicable_as_path_hint, 

+

1991 ) 

+

1992 

+

1993 

+

1994def _source_attribute_allowed( 

+

1995 source_attribute_allowed: bool, 

+

1996 error_context: str, 

+

1997 annotation: Optional[DebputyParseHint], 

+

1998) -> None: 

+

1999 if source_attribute_allowed or annotation is None: 1999 ↛ 2001line 1999 didn't jump to line 2001, because the condition on line 1999 was never false

+

2000 return 

+

2001 raise ValueError( 

+

2002 f'The annotation "{annotation}" cannot be used here. {error_context}' 

+

2003 ) 

+
+ + + -- cgit v1.2.3