Coverage for src/debputy/manifest_parser/declarative_parser.py: 76%

781 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import collections 

2import dataclasses 

3from typing import ( 

4 Any, 

5 Callable, 

6 Tuple, 

7 TypedDict, 

8 Dict, 

9 get_type_hints, 

10 Annotated, 

11 get_args, 

12 get_origin, 

13 TypeVar, 

14 Generic, 

15 FrozenSet, 

16 Mapping, 

17 Optional, 

18 cast, 

19 is_typeddict, 

20 Type, 

21 Union, 

22 List, 

23 Collection, 

24 NotRequired, 

25 Iterable, 

26 Literal, 

27 Sequence, 

28 Container, 

29) 

30 

31from debputy.manifest_parser.base_types import ( 

32 DebputyParsedContent, 

33 FileSystemMatchRule, 

34 FileSystemExactMatchRule, 

35 DebputyDispatchableType, 

36 TypeMapping, 

37) 

38from debputy.manifest_parser.exceptions import ( 

39 ManifestParseException, 

40) 

41from debputy.manifest_parser.mapper_code import ( 

42 normalize_into_list, 

43 wrap_into_list, 

44 map_each_element, 

45) 

46from debputy.manifest_parser.parser_data import ParserContextData 

47from debputy.manifest_parser.util import AttributePath, unpack_type, find_annotation 

48from debputy.plugin.api.impl_types import ( 

49 DeclarativeInputParser, 

50 TD, 

51 _ALL_PACKAGE_TYPES, 

52 resolve_package_type_selectors, 

53 ListWrappedDeclarativeInputParser, 

54 DispatchingObjectParser, 

55 DispatchingTableParser, 

56 TTP, 

57 TP, 

58 InPackageContextParser, 

59) 

60from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector 

61from debputy.util import _info, _warn, assume_not_none 

62 

63try: 

64 from Levenshtein import distance 

65except ImportError: 

66 _WARN_ONCE = False 

67 

68 def _detect_possible_typo( 

69 _key: str, 

70 _value: object, 

71 _manifest_attributes: Mapping[str, "AttributeDescription"], 

72 _path: "AttributePath", 

73 ) -> None: 

74 global _WARN_ONCE 

75 if not _WARN_ONCE: 

76 _WARN_ONCE = True 

77 _info( 

78 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

79 ) 

80 

81else: 

82 

83 def _detect_possible_typo( 

84 key: str, 

85 value: object, 

86 manifest_attributes: Mapping[str, "AttributeDescription"], 

87 path: "AttributePath", 

88 ) -> None: 

89 k_len = len(key) 

90 key_path = path[key] 

91 matches: List[str] = [] 

92 current_match_strength = 0 

93 for acceptable_key, attr in manifest_attributes.items(): 

94 if abs(k_len - len(acceptable_key)) > 2: 

95 continue 

96 d = distance(key, acceptable_key) 

97 if d > 2: 

98 continue 

99 try: 

100 attr.type_validator.ensure_type(value, key_path) 

101 except ManifestParseException: 

102 if attr.type_validator.base_type_match(value): 

103 match_strength = 1 

104 else: 

105 match_strength = 0 

106 else: 

107 match_strength = 2 

108 

109 if match_strength < current_match_strength: 

110 continue 

111 if match_strength > current_match_strength: 

112 current_match_strength = match_strength 

113 matches.clear() 

114 matches.append(acceptable_key) 

115 

116 if not matches: 

117 return 

118 ref = f'at "{path.path}"' if path else "at the manifest root level" 

119 if len(matches) == 1: 

120 possible_match = repr(matches[0]) 

121 _warn( 

122 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

123 ) 

124 else: 

125 matches.sort() 

126 possible_matches = ", ".join(repr(a) for a in matches) 

127 _warn( 

128 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

129 ) 

130 

131 

132SF = TypeVar("SF") 

133T = TypeVar("T") 

134S = TypeVar("S") 

135 

136 

137_NONE_TYPE = type(None) 

138 

139 

140# These must be able to appear in an "isinstance" check and must be builtin types. 

141BASIC_SIMPLE_TYPES = { 

142 str: "string", 

143 int: "integer", 

144 bool: "boolean", 

145} 

146 

147 

148class AttributeTypeHandler: 

149 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

150 

151 def __init__( 

152 self, 

153 description: str, 

154 ensure_type: Callable[[Any, AttributePath], None], 

155 *, 

156 base_type: Optional[Type[Any]] = None, 

157 mapper: Optional[ 

158 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

159 ] = None, 

160 ) -> None: 

161 self._description = description 

162 self._ensure_type = ensure_type 

163 self.base_type = base_type 

164 self.mapper = mapper 

165 

166 def describe_type(self) -> str: 

167 return self._description 

168 

169 def ensure_type(self, obj: object, path: AttributePath) -> None: 

170 self._ensure_type(obj, path) 

171 

172 def base_type_match(self, obj: object) -> bool: 

173 base_type = self.base_type 

174 return base_type is not None and isinstance(obj, base_type) 

175 

176 def map_type( 

177 self, 

178 value: Any, 

179 path: AttributePath, 

180 parser_context: Optional["ParserContextData"], 

181 ) -> Any: 

182 mapper = self.mapper 

183 if mapper is not None: 

184 return mapper(value, path, parser_context) 

185 return value 

186 

187 def combine_mapper( 

188 self, 

189 mapper: Optional[ 

190 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

191 ], 

192 ) -> "AttributeTypeHandler": 

193 if mapper is None: 

194 return self 

195 if self.mapper is not None: 

196 m = self.mapper 

197 

198 def _combined_mapper( 

199 value: Any, 

200 path: AttributePath, 

201 parser_context: Optional["ParserContextData"], 

202 ) -> Any: 

203 return mapper(m(value, path, parser_context), path, parser_context) 

204 

205 else: 

206 _combined_mapper = mapper 

207 

208 return AttributeTypeHandler( 

209 self._description, 

210 self._ensure_type, 

211 base_type=self.base_type, 

212 mapper=_combined_mapper, 

213 ) 

214 

215 

216@dataclasses.dataclass(slots=True) 

217class AttributeDescription: 

218 source_attribute_name: str 

219 target_attribute: str 

220 attribute_type: Any 

221 type_validator: AttributeTypeHandler 

222 annotations: Tuple[Any, ...] 

223 conflicting_attributes: FrozenSet[str] 

224 conditional_required: Optional["ConditionalRequired"] 

225 parse_hints: Optional["DetectedDebputyParseHint"] = None 

226 is_optional: bool = False 

227 

228 

229def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

230 if attribute_path.path_hint is not None: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 return True 

232 if isinstance(v, str): 

233 attribute_path.path_hint = v 

234 return True 

235 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

236 attribute_path.path_hint = v[0] 

237 return True 

238 return False 

239 

240 

241@dataclasses.dataclass(slots=True, frozen=True) 

242class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

243 alt_form_parser: AttributeDescription 

244 inline_reference_documentation: Optional[ParserDocumentation] = None 

245 

246 def parse_input( 

247 self, 

248 value: object, 

249 path: AttributePath, 

250 *, 

251 parser_context: Optional["ParserContextData"] = None, 

252 ) -> TD: 

253 if self.reference_documentation_url is not None: 

254 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

255 else: 

256 doc_ref = "" 

257 

258 alt_form_parser = self.alt_form_parser 

259 if value is None: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

260 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

261 if self.reference_documentation_url is not None: 

262 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

263 raise ManifestParseException( 

264 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

265 ) 

266 _extract_path_hint(value, path) 

267 alt_form_parser.type_validator.ensure_type(value, path) 

268 attribute = alt_form_parser.target_attribute 

269 alias_mapping = { 

270 attribute: ("", None), 

271 } 

272 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

273 path.alias_mapping = alias_mapping 

274 return cast("TD", {attribute: v}) 

275 

276 

277@dataclasses.dataclass(slots=True) 

278class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

279 input_time_required_parameters: FrozenSet[str] 

280 all_parameters: FrozenSet[str] 

281 manifest_attributes: Mapping[str, "AttributeDescription"] 

282 source_attributes: Mapping[str, "AttributeDescription"] 

283 at_least_one_of: FrozenSet[FrozenSet[str]] 

284 alt_form_parser: Optional[AttributeDescription] 

285 mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset() 

286 _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None 

287 inline_reference_documentation: Optional[ParserDocumentation] = None 

288 path_hint_source_attributes: Sequence[str] = tuple() 

289 

290 def _parse_alt_form( 

291 self, 

292 value: object, 

293 path: AttributePath, 

294 *, 

295 parser_context: Optional["ParserContextData"] = None, 

296 ) -> TD: 

297 alt_form_parser = self.alt_form_parser 

298 if alt_form_parser is None: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 raise ManifestParseException( 

300 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

301 ) 

302 _extract_path_hint(value, path) 

303 alt_form_parser.type_validator.ensure_type(value, path) 

304 assert ( 

305 value is not None 

306 ), "The alternative form was None, but the parser should have rejected None earlier." 

307 attribute = alt_form_parser.target_attribute 

308 alias_mapping = { 

309 attribute: ("", None), 

310 } 

311 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

312 path.alias_mapping = alias_mapping 

313 return cast("TD", {attribute: v}) 

314 

315 def _validate_expected_keys( 

316 self, 

317 value: Dict[Any, Any], 

318 path: AttributePath, 

319 *, 

320 parser_context: Optional["ParserContextData"] = None, 

321 ) -> None: 

322 unknown_keys = value.keys() - self.all_parameters 

323 doc_ref = self._doc_url_error_suffix() 

324 if unknown_keys: 324 ↛ 325line 324 didn't jump to line 325, because the condition on line 324 was never true

325 for k in unknown_keys: 

326 if isinstance(k, str): 

327 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

328 unused_keys = self.all_parameters - value.keys() 

329 if unused_keys: 

330 k = ", ".join(unused_keys) 

331 raise ManifestParseException( 

332 f'Unknown keys "{unknown_keys}" at {path.path}". Keys that could be used here are: {k}.{doc_ref}' 

333 ) 

334 raise ManifestParseException( 

335 f'Unknown keys "{unknown_keys}" at {path.path}". Please remove them.{doc_ref}' 

336 ) 

337 missing_keys = self.input_time_required_parameters - value.keys() 

338 if missing_keys: 

339 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

340 raise ManifestParseException( 

341 f"The following keys were required but not present at {path.path}: {required}{doc_ref}" 

342 ) 

343 for maybe_required in self.all_parameters - value.keys(): 

344 attr = self.manifest_attributes[maybe_required] 

345 assert attr.conditional_required is None or parser_context is not None 

346 if ( 346 ↛ 352line 346 didn't jump to line 352

347 attr.conditional_required is not None 

348 and attr.conditional_required.condition_applies( 

349 assume_not_none(parser_context) 

350 ) 

351 ): 

352 reason = attr.conditional_required.reason 

353 raise ManifestParseException( 

354 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path}. {reason}{doc_ref}' 

355 ) 

356 for keyset in self.at_least_one_of: 

357 matched_keys = value.keys() & keyset 

358 if not matched_keys: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

359 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

360 raise ManifestParseException( 

361 f"At least one of the following keys must be present at {path.path}:" 

362 f" {conditionally_required}{doc_ref}" 

363 ) 

364 for group in self.mutually_exclusive_attributes: 

365 matched = value.keys() & group 

366 if len(matched) > 1: 366 ↛ 367line 366 didn't jump to line 367, because the condition on line 366 was never true

367 ck = ", ".join(repr(k) for k in sorted(matched)) 

368 raise ManifestParseException( 

369 f"Could not parse {path.path}: The following attributes are" 

370 f" mutually exclusive: {ck}{doc_ref}" 

371 ) 

372 

373 def _parse_typed_dict_form( 

374 self, 

375 value: Dict[Any, Any], 

376 path: AttributePath, 

377 *, 

378 parser_context: Optional["ParserContextData"] = None, 

379 ) -> TD: 

380 self._validate_expected_keys(value, path, parser_context=parser_context) 

381 result = {} 

382 per_attribute_conflicts = self._per_attribute_conflicts() 

383 alias_mapping = {} 

384 for path_hint_source_attributes in self.path_hint_source_attributes: 

385 v = value.get(path_hint_source_attributes) 

386 if v is not None and _extract_path_hint(v, path): 

387 break 

388 for k, v in value.items(): 

389 attr = self.manifest_attributes[k] 

390 matched = value.keys() & per_attribute_conflicts[k] 

391 if matched: 391 ↛ 392line 391 didn't jump to line 392, because the condition on line 391 was never true

392 ck = ", ".join(repr(k) for k in sorted(matched)) 

393 raise ManifestParseException( 

394 f'The attribute "{k}" at {path.path} cannot be used with the following' 

395 f" attributes: {ck}{self._doc_url_error_suffix()}" 

396 ) 

397 nk = attr.target_attribute 

398 key_path = path[k] 

399 attr.type_validator.ensure_type(v, key_path) 

400 if v is None: 400 ↛ 401line 400 didn't jump to line 401, because the condition on line 400 was never true

401 continue 

402 if k != nk: 

403 alias_mapping[nk] = k, None 

404 v = attr.type_validator.map_type(v, key_path, parser_context) 

405 result[nk] = v 

406 if alias_mapping: 

407 path.alias_mapping = alias_mapping 

408 return cast("TD", result) 

409 

410 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

411 doc_url = self.reference_documentation_url 

412 if doc_url is not None: 

413 if see_url_version: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true

414 return f" Please see {doc_url} for the documentation." 

415 return f" (Documentation: {doc_url})" 

416 return "" 

417 

418 def parse_input( 

419 self, 

420 value: object, 

421 path: AttributePath, 

422 *, 

423 parser_context: Optional["ParserContextData"] = None, 

424 ) -> TD: 

425 if value is None: 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true

426 form_note = " The attribute must be a mapping." 

427 if self.alt_form_parser is not None: 

428 form_note = ( 

429 " The attribute can be a mapping or a non-mapping format" 

430 ' (usually, "non-mapping format" means a string or a list of strings).' 

431 ) 

432 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

433 raise ManifestParseException( 

434 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

435 ) 

436 

437 if not isinstance(value, dict): 

438 return self._parse_alt_form(value, path, parser_context=parser_context) 

439 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

440 

441 def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: 

442 conflicts = self._per_attribute_conflicts_cache 

443 if conflicts is not None: 

444 return conflicts 

445 attrs = self.source_attributes 

446 conflicts = { 

447 a.source_attribute_name: frozenset( 

448 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

449 ) 

450 for a in attrs.values() 

451 } 

452 self._per_attribute_conflicts_cache = conflicts 

453 return self._per_attribute_conflicts_cache 

454 

455 

456class DebputyParseHint: 

457 @classmethod 

458 def target_attribute(cls, target_attribute: str) -> "DebputyParseHint": 

459 """Define this source attribute to have a different target attribute name 

460 

461 As an example: 

462 

463 >>> class SourceType(TypedDict): 

464 ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] 

465 ... sources: NotRequired[List[str]] 

466 >>> class TargetType(TypedDict): 

467 ... sources: List[str] 

468 >>> pg = ParserGenerator() 

469 >>> parser = pg.generate_parser(TargetType, source_content=SourceType) 

470 

471 In this example, the user can provide either `source` or `sources` and the parser will 

472 map them to the `sources` attribute in the `TargetType`. Note this example relies on 

473 the builtin mapping of `str` to `List[str]` to align the types between `source` (from 

474 SourceType) and `sources` (from TargetType). 

475 

476 The following rules apply: 

477 

478 * All source attributes that map to the same target attribute will be mutually exclusive 

479 (that is, the user cannot give `source` *and* `sources` as input). 

480 * When the target attribute is required, the source attributes are conditionally 

481 mandatory requiring the user to provide exactly one of them. 

482 * When multiple source attributes point to a single target attribute, none of the source 

483 attributes can be Required. 

484 * The annotation can only be used for the source type specification and the source type 

485 specification must be different from the target type specification. 

486 

487 The `target_attribute` annotation can be used without having multiple source attributes. This 

488 can be useful if the source attribute name is not valid as a python variable identifier to 

489 rename it to a valid python identifier. 

490 

491 :param target_attribute: The attribute name in the target content 

492 :return: The annotation. 

493 """ 

494 return TargetAttribute(target_attribute) 

495 

496 @classmethod 

497 def conflicts_with_source_attributes( 

498 cls, 

499 *conflicting_source_attributes: str, 

500 ) -> "DebputyParseHint": 

501 """Declare a conflict with one or more source attributes 

502 

503 Example: 

504 

505 >>> class SourceType(TypedDict): 

506 ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] 

507 ... sources: NotRequired[List[str]] 

508 ... into_dir: NotRequired[str] 

509 ... renamed_to: Annotated[ 

510 ... NotRequired[str], 

511 ... DebputyParseHint.conflicts_with_source_attributes("sources", "into_dir") 

512 ... ] 

513 >>> class TargetType(TypedDict): 

514 ... sources: List[str] 

515 ... into_dir: NotRequired[str] 

516 ... renamed_to: NotRequired[str] 

517 >>> pg = ParserGenerator() 

518 >>> parser = pg.generate_parser(TargetType, source_content=SourceType) 

519 

520 In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report 

521 an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for 

522 the input source. That is, it is irrelevant that `sources` and `source´ happens to "map" to the same target 

523 attribute. 

524 

525 The following rules apply: 

526 * It is not possible for a target attribute to declare conflicts unless the target type spec is reused as 

527 source type spec. 

528 * All attributes involved in a conflict must be NotRequired. If any of the attributes are Required, then 

529 the parser generator will reject the input. 

530 * All attributes listed in the conflict must be valid attributes in the source type spec. 

531 

532 Note you do not have to specify conflicts between two attributes with the same target attribute name. The 

533 `target_attribute` annotation will handle that for you. 

534 

535 :param conflicting_source_attributes: All source attributes that cannot be used with this attribute. 

536 :return: The annotation. 

537 """ 

538 if len(conflicting_source_attributes) < 1: 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true

539 raise ValueError( 

540 "DebputyParseHint.conflicts_with_source_attributes requires at least one attribute as input" 

541 ) 

542 return ConflictWithSourceAttribute(frozenset(conflicting_source_attributes)) 

543 

544 @classmethod 

545 def required_when_single_binary( 

546 cls, 

547 *, 

548 package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, 

549 ) -> "DebputyParseHint": 

550 """Declare a source attribute as required when the source package produces exactly one binary package 

551 

552 The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition 

553 can only be used for source attributes. 

554 """ 

555 resolved_package_types = resolve_package_type_selectors(package_type) 

556 reason = "The field is required for source packages producing exactly one binary package" 

557 if resolved_package_types != _ALL_PACKAGE_TYPES: 

558 types = ", ".join(sorted(resolved_package_types)) 

559 reason += f" of type {types}" 

560 return ConditionalRequired( 

561 reason, 

562 lambda c: len( 

563 [ 

564 p 

565 for p in c.binary_packages.values() 

566 if p.package_type in package_type 

567 ] 

568 ) 

569 == 1, 

570 ) 

571 return ConditionalRequired( 

572 reason, 

573 lambda c: c.is_single_binary_package, 

574 ) 

575 

576 @classmethod 

577 def required_when_multi_binary( 

578 cls, 

579 *, 

580 package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, 

581 ) -> "DebputyParseHint": 

582 """Declare a source attribute as required when the source package produces two or more binary package 

583 

584 The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition 

585 can only be used for source attributes. 

586 """ 

587 resolved_package_types = resolve_package_type_selectors(package_type) 

588 reason = "The field is required for source packages producing two or more binary packages" 

589 if resolved_package_types != _ALL_PACKAGE_TYPES: 

590 types = ", ".join(sorted(resolved_package_types)) 

591 reason = ( 

592 "The field is required for source packages producing not producing exactly one binary packages" 

593 f" of type {types}" 

594 ) 

595 return ConditionalRequired( 

596 reason, 

597 lambda c: len( 

598 [ 

599 p 

600 for p in c.binary_packages.values() 

601 if p.package_type in package_type 

602 ] 

603 ) 

604 != 1, 

605 ) 

606 return ConditionalRequired( 

607 reason, 

608 lambda c: not c.is_single_binary_package, 

609 ) 

610 

611 @classmethod 

612 def manifest_attribute(cls, attribute: str) -> "DebputyParseHint": 

613 """Declare what the attribute name (as written in the manifest) should be 

614 

615 By default, debputy will do an attribute normalizing that will take valid python identifiers such 

616 as `dest_dir` and remap it to the manifest variant (such as `dest-dir`) automatically. If you have 

617 a special case, where this built-in normalization is insufficient or the python name is considerably 

618 different from what the user would write in the manifest, you can use this parse hint to set the 

619 name that the user would have to write in the manifest for this attribute. 

620 

621 >>> class SourceType(TypedDict): 

622 ... source: List[FileSystemMatchRule] 

623 ... # Use "as" in the manifest because "as_" was not pretty enough 

624 ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.manifest_attribute("as")] 

625 

626 In this example, we use the parse hint to use "as" as the name in the manifest, because we cannot 

627 use "as" a valid python identifier (it is a keyword). While debputy would map `as_` to `as` for us, 

628 we have chosen to use `install_as` as a python identifier. 

629 """ 

630 return ManifestAttribute(attribute) 

631 

632 @classmethod 

633 def not_path_error_hint(cls) -> "DebputyParseHint": 

634 """Mark this attribute as not a "path hint" when it comes to reporting errors 

635 

636 By default, `debputy` will pick up attributes that uses path names (FileSystemMatchRule) as 

637 candidates for parse error hints (the little "<Search for: VALUE>" in error messages). 

638 

639 Most rules only have one active path-based attribute and paths tends to be unique enough 

640 that it helps people spot the issue faster. However, in rare cases, you can have multiple 

641 attributes that fit the bill. In this case, this hint can be used to "hide" the suboptimal 

642 choice. As an example: 

643 

644 >>> class SourceType(TypedDict): 

645 ... source: List[FileSystemMatchRule] 

646 ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.not_path_error_hint()] 

647 

648 In this case, without the hint, `debputy` might pick up `install_as` as the attribute to 

649 use as hint for error reporting. However, here we have decided that we never want `install_as` 

650 leaving `source` as the only option. 

651 

652 Generally, this type hint must be placed on the **source** format. Any source attribute matching 

653 the parsed format will be ignored. 

654 

655 Mind the asymmetry: The annotation is placed in the **source** format while `debputy` looks at 

656 the type of the target attribute to determine if it counts as path. 

657 """ 

658 return NOT_PATH_HINT 

659 

660 

661@dataclasses.dataclass(frozen=True, slots=True) 

662class TargetAttribute(DebputyParseHint): 

663 attribute: str 

664 

665 

666@dataclasses.dataclass(frozen=True, slots=True) 

667class ConflictWithSourceAttribute(DebputyParseHint): 

668 conflicting_attributes: FrozenSet[str] 

669 

670 

671@dataclasses.dataclass(frozen=True, slots=True) 

672class ConditionalRequired(DebputyParseHint): 

673 reason: str 

674 condition: Callable[["ParserContextData"], bool] 

675 

676 def condition_applies(self, context: "ParserContextData") -> bool: 

677 return self.condition(context) 

678 

679 

680@dataclasses.dataclass(frozen=True, slots=True) 

681class ManifestAttribute(DebputyParseHint): 

682 attribute: str 

683 

684 

685class NotPathHint(DebputyParseHint): 

686 pass 

687 

688 

689NOT_PATH_HINT = NotPathHint() 

690 

691 

692def _is_path_attribute_candidate( 

693 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

694) -> bool: 

695 if ( 

696 source_attribute.parse_hints 

697 and not source_attribute.parse_hints.applicable_as_path_hint 

698 ): 

699 return False 

700 target_type = target_attribute.attribute_type 

701 _, origin, args = unpack_type(target_type, False) 

702 match_type = target_type 

703 if origin == list: 

704 match_type = args[0] 

705 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

706 

707 

708class ParserGenerator: 

709 def __init__(self) -> None: 

710 self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} 

711 self._object_parsers: Dict[str, DispatchingObjectParser] = {} 

712 self._table_parsers: Dict[ 

713 Type[DebputyDispatchableType], DispatchingTableParser[Any] 

714 ] = {} 

715 self._in_package_context_parser: Dict[str, Any] = {} 

716 

717 def register_mapped_type(self, mapped_type: TypeMapping) -> None: 

718 existing = self._registered_types.get(mapped_type.target_type) 

719 if existing is not None: 719 ↛ 720line 719 didn't jump to line 720, because the condition on line 719 was never true

720 raise ValueError(f"The type {existing} is already registered") 

721 self._registered_types[mapped_type.target_type] = mapped_type 

722 

723 def discard_mapped_type(self, mapped_type: Type[T]) -> None: 

724 del self._registered_types[mapped_type] 

725 

726 def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None: 

727 assert rt not in self._table_parsers 

728 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

729 

730 def add_object_parser( 

731 self, 

732 path: str, 

733 *, 

734 parser_documentation: Optional[ParserDocumentation] = None, 

735 ) -> None: 

736 assert path not in self._in_package_context_parser 

737 assert path not in self._object_parsers 

738 self._object_parsers[path] = DispatchingObjectParser( 

739 path, parser_documentation=parser_documentation 

740 ) 

741 

742 def add_in_package_context_parser( 

743 self, 

744 path: str, 

745 delegate: DeclarativeInputParser[Any], 

746 ) -> None: 

747 assert path not in self._in_package_context_parser 

748 assert path not in self._object_parsers 

749 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

750 

751 @property 

752 def dispatchable_table_parsers( 

753 self, 

754 ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

755 return self._table_parsers 

756 

757 @property 

758 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

759 return self._object_parsers 

760 

761 def dispatch_parser_table_for( 

762 self, rule_type: TTP 

763 ) -> Optional[DispatchingTableParser[TP]]: 

764 return cast( 

765 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

766 ) 

767 

768 def generate_parser( 

769 self, 

770 parsed_content: Type[TD], 

771 *, 

772 source_content: Optional[SF] = None, 

773 allow_optional: bool = False, 

774 inline_reference_documentation: Optional[ParserDocumentation] = None, 

775 ) -> DeclarativeInputParser[TD]: 

776 """Derive a parser from a TypedDict 

777 

778 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

779 or two that are used as a description. 

780 

781 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

782 their types. As an example: 

783 

784 >>> class InstallDocsRule(DebputyParsedContent): 

785 ... sources: List[str] 

786 ... into: List[str] 

787 >>> pg = ParserGenerator() 

788 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

789 

790 This will create a parser that would be able to interpret something like: 

791 

792 ```yaml 

793 install-docs: 

794 sources: ["docs/*"] 

795 into: ["my-pkg"] 

796 ``` 

797 

798 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

799 you can also provide a TypedDict describing the input, enabling more flexibility: 

800 

801 >>> class InstallDocsRule(DebputyParsedContent): 

802 ... sources: List[str] 

803 ... into: List[str] 

804 >>> class InputDocsRuleInputFormat(TypedDict): 

805 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

806 ... sources: NotRequired[List[str]] 

807 ... into: Union[str, List[str]] 

808 >>> pg = ParserGenerator() 

809 >>> flexible_parser = pg.generate_parser( 

810 ... InstallDocsRule, 

811 ... source_content=InputDocsRuleInputFormat, 

812 ... ) 

813 

814 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

815 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

816 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

817 or a list of strings. Regardless of how the input was provided, the parser will normalize the input such that 

818 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

819 both the previous input but also the following input: 

820 

821 ```yaml 

822 install-docs: 

823 source: "docs/*" 

824 into: "my-pkg" 

825 ``` 

826 

827 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

828 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

829 while parsing. 

830 

831 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

832 as part of the input. Example: 

833 

834 >>> class DiscardRule(DebputyParsedContent): 

835 ... paths: List[str] 

836 >>> class DiscardRuleInputDictFormat(TypedDict): 

837 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

838 ... paths: NotRequired[List[str]] 

839 >>> # This format relies on DiscardRule having exactly one Required attribute 

840 >>> DiscardRuleInputWithAltFormat = Union[ 

841 ... DiscardRuleInputDictFormat, 

842 ... str, 

843 ... List[str], 

844 ... ] 

845 >>> pg = ParserGenerator() 

846 >>> flexible_parser = pg.generate_parser( 

847 ... DiscardRule, 

848 ... source_content=DiscardRuleInputWithAltFormat, 

849 ... ) 

850 

851 

852 Supported types: 

853 * `List` - must have a fixed type argument (such as `List[str]`) 

854 * `str` 

855 * `int` 

856 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

857 in the debian/control file. The code receives the BinaryPackage instance 

858 matching that input. 

859 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

860 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

861 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

862 available statically on all Debian systems (must be in `base-passwd`). 

863 The user has multiple options for how to specify it (either via name or id). 

864 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

865 available statically on all Debian systems (must be in `base-passwd`). 

866 The user has multiple options for how to specify it (either via name or id). 

867 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

868 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

869 provides the `debputy' default `when` parameter for conditionals. 

870 

871 Supported special type-like parameters: 

872 

873 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

874 outermost level. Cannot vary between `parsed_content` and `source_content`. 

875 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

876 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

877 Automapping (see below) is restricted to two members in the Union. 

878 

879 Notable non-supported types: 

880 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

881 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

882 

883 Automatic mapping rules from `source_content` to `parsed_content`: 

884 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

885 `lambda value: value if isinstance(value, list) else [value]` 

886 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

887 

888 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

889 for concrete features that may be useful to you. 

890 

891 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

892 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

893 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

894 `List[TypedDict[...]]`. 

895 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

896 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

897 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

898 Note you should never pass the parsed_content as source_content directly. 

899 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

900 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

901 can keep this False). 

902 :param inline_reference_documentation: Optionally, programmatic documentation 

903 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

904 """ 

905 orig_parsed_content = parsed_content 

906 if source_content is parsed_content: 906 ↛ 907line 906 didn't jump to line 907, because the condition on line 906 was never true

907 raise ValueError( 

908 "Do not provide source_content if it is the same as parsed_content" 

909 ) 

910 is_list_wrapped = False 

911 if get_origin(orig_parsed_content) == list: 

912 parsed_content = get_args(orig_parsed_content)[0] 

913 is_list_wrapped = True 

914 

915 if isinstance(parsed_content, type) and issubclass( 

916 parsed_content, DebputyDispatchableType 

917 ): 

918 parser = self.dispatch_parser_table_for(parsed_content) 

919 if parser is None: 919 ↛ 920line 919 didn't jump to line 920, because the condition on line 919 was never true

920 raise ValueError( 

921 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

922 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

923 ) 

924 # FIXME: Only the list wrapped version has documentation. 

925 if is_list_wrapped: 925 ↛ 930line 925 didn't jump to line 930, because the condition on line 925 was never false

926 parser = ListWrappedDeclarativeInputParser( 

927 parser, 

928 inline_reference_documentation=inline_reference_documentation, 

929 ) 

930 return parser 

931 

932 if not is_typeddict(parsed_content): 932 ↛ 933line 932 didn't jump to line 933, because the condition on line 932 was never true

933 raise ValueError( 

934 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

935 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

936 ) 

937 if is_list_wrapped: 

938 if get_origin(source_content) != list: 938 ↛ 939line 938 didn't jump to line 939, because the condition on line 938 was never true

939 raise ValueError( 

940 "If the parsed_content is a List type, then source_format must be a List type as well." 

941 ) 

942 source_content = get_args(source_content)[0] 

943 

944 target_attributes = self._parse_types( 

945 parsed_content, 

946 allow_source_attribute_annotations=source_content is None, 

947 forbid_optional=not allow_optional, 

948 ) 

949 required_target_parameters = frozenset(parsed_content.__required_keys__) 

950 parsed_alt_form = None 

951 non_mapping_source_only = False 

952 

953 if source_content is not None: 

954 default_target_attribute = None 

955 if len(required_target_parameters) == 1: 

956 default_target_attribute = next(iter(required_target_parameters)) 

957 

958 source_typed_dict, alt_source_forms = _extract_typed_dict( 

959 source_content, 

960 default_target_attribute, 

961 ) 

962 if alt_source_forms: 

963 parsed_alt_form = self._parse_alt_form( 

964 alt_source_forms, 

965 default_target_attribute, 

966 ) 

967 if source_typed_dict is not None: 

968 source_content_attributes = self._parse_types( 

969 source_typed_dict, 

970 allow_target_attribute_annotation=True, 

971 allow_source_attribute_annotations=True, 

972 forbid_optional=not allow_optional, 

973 ) 

974 source_content_parameter = "source_content" 

975 source_and_parsed_differs = True 

976 else: 

977 source_typed_dict = parsed_content 

978 source_content_attributes = target_attributes 

979 source_content_parameter = "parsed_content" 

980 source_and_parsed_differs = True 

981 non_mapping_source_only = True 

982 else: 

983 source_typed_dict = parsed_content 

984 source_content_attributes = target_attributes 

985 source_content_parameter = "parsed_content" 

986 source_and_parsed_differs = False 

987 

988 sources = collections.defaultdict(set) 

989 seen_targets = set() 

990 seen_source_names: Dict[str, str] = {} 

991 source_attributes: Dict[str, AttributeDescription] = {} 

992 path_hint_source_attributes = [] 

993 

994 for k in source_content_attributes: 

995 ia = source_content_attributes[k] 

996 

997 ta = ( 

998 target_attributes.get(ia.target_attribute) 

999 if source_and_parsed_differs 

1000 else ia 

1001 ) 

1002 if ta is None: 1002 ↛ 1004line 1002 didn't jump to line 1004, because the condition on line 1002 was never true

1003 # Error message would be wrong if this assertion is false. 

1004 assert source_and_parsed_differs 

1005 raise ValueError( 

1006 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

1007 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

1008 ) 

1009 if _is_path_attribute_candidate(ia, ta): 

1010 path_hint_source_attributes.append(ia.source_attribute_name) 

1011 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

1012 if existing_source_name: 1012 ↛ 1013line 1012 didn't jump to line 1013, because the condition on line 1012 was never true

1013 raise ValueError( 

1014 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

1015 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

1016 f' so only one attribute use "{ia.source_attribute_name}".' 

1017 ) 

1018 seen_source_names[ia.source_attribute_name] = k 

1019 seen_targets.add(ta.target_attribute) 

1020 sources[ia.target_attribute].add(k) 

1021 if source_and_parsed_differs: 

1022 bridge_mapper = self._type_normalize( 

1023 k, ia.attribute_type, ta.attribute_type, False 

1024 ) 

1025 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

1026 source_attributes[k] = ia 

1027 

1028 def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]: 

1029 return frozenset( 

1030 source_content_attributes[a].source_attribute_name for a in td_name 

1031 ) 

1032 

1033 _check_attributes( 

1034 parsed_content, 

1035 source_typed_dict, 

1036 source_content_attributes, 

1037 sources, 

1038 ) 

1039 

1040 at_least_one_of = frozenset( 

1041 _as_attr_names(g) 

1042 for k, g in sources.items() 

1043 if len(g) > 1 and k in required_target_parameters 

1044 ) 

1045 

1046 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 1046 ↛ 1047line 1046 didn't jump to line 1047, because the condition on line 1046 was never true

1047 missing = ", ".join( 

1048 repr(k) for k in (target_attributes.keys() - seen_targets) 

1049 ) 

1050 raise ValueError( 

1051 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

1052 f" {missing}" 

1053 ) 

1054 all_mutually_exclusive_fields = frozenset( 

1055 _as_attr_names(g) for g in sources.values() if len(g) > 1 

1056 ) 

1057 

1058 all_parameters = ( 

1059 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

1060 ) 

1061 _check_conflicts( 

1062 source_content_attributes, 

1063 source_typed_dict.__required_keys__, 

1064 all_parameters, 

1065 ) 

1066 

1067 manifest_attributes = { 

1068 a.source_attribute_name: a for a in source_content_attributes.values() 

1069 } 

1070 

1071 if parsed_alt_form is not None: 

1072 target_attribute = parsed_alt_form.target_attribute 

1073 if ( 1073 ↛ 1078line 1073 didn't jump to line 1078

1074 target_attribute not in required_target_parameters 

1075 and required_target_parameters 

1076 or len(required_target_parameters) > 1 

1077 ): 

1078 raise NotImplementedError( 

1079 "When using alternative source formats (Union[TypedDict, ...]), then the" 

1080 " target must have at most one require parameter" 

1081 ) 

1082 bridge_mapper = self._type_normalize( 

1083 target_attribute, 

1084 parsed_alt_form.attribute_type, 

1085 target_attributes[target_attribute].attribute_type, 

1086 False, 

1087 ) 

1088 parsed_alt_form.type_validator = ( 

1089 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

1090 ) 

1091 

1092 _verify_inline_reference_documentation( 

1093 source_content_attributes, 

1094 inline_reference_documentation, 

1095 parsed_alt_form is not None, 

1096 ) 

1097 if non_mapping_source_only: 

1098 parser = DeclarativeNonMappingInputParser( 

1099 assume_not_none(parsed_alt_form), 

1100 inline_reference_documentation=inline_reference_documentation, 

1101 ) 

1102 else: 

1103 parser = DeclarativeMappingInputParser( 

1104 _as_attr_names(source_typed_dict.__required_keys__), 

1105 _as_attr_names(all_parameters), 

1106 manifest_attributes, 

1107 source_attributes, 

1108 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

1109 alt_form_parser=parsed_alt_form, 

1110 at_least_one_of=at_least_one_of, 

1111 inline_reference_documentation=inline_reference_documentation, 

1112 path_hint_source_attributes=tuple(path_hint_source_attributes), 

1113 ) 

1114 if is_list_wrapped: 

1115 parser = ListWrappedDeclarativeInputParser(parser) 

1116 return parser 

1117 

1118 def _as_type_validator( 

1119 self, 

1120 attribute: str, 

1121 provided_type: Any, 

1122 parsing_typed_dict_attribute: bool, 

1123 ) -> AttributeTypeHandler: 

1124 assert not isinstance(provided_type, tuple) 

1125 

1126 if isinstance(provided_type, type) and issubclass( 

1127 provided_type, DebputyDispatchableType 

1128 ): 

1129 return _dispatch_parser(provided_type) 

1130 

1131 unmapped_type = self._strip_mapped_types( 

1132 provided_type, 

1133 parsing_typed_dict_attribute, 

1134 ) 

1135 type_normalizer = self._type_normalize( 

1136 attribute, 

1137 unmapped_type, 

1138 provided_type, 

1139 parsing_typed_dict_attribute, 

1140 ) 

1141 t_unmapped, t_orig, t_args = unpack_type( 

1142 unmapped_type, 

1143 parsing_typed_dict_attribute, 

1144 ) 

1145 

1146 if ( 1146 ↛ 1152line 1146 didn't jump to line 1152

1147 t_orig == Union 

1148 and t_args 

1149 and len(t_args) == 2 

1150 and any(v is _NONE_TYPE for v in t_args) 

1151 ): 

1152 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

1153 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

1154 validator = self._as_type_validator( 

1155 attribute, actual_type, parsing_typed_dict_attribute 

1156 ) 

1157 

1158 def _validator(v: Any, path: AttributePath) -> None: 

1159 if v is None: 

1160 return 

1161 validator.ensure_type(v, path) 

1162 

1163 return AttributeTypeHandler( 

1164 validator.describe_type(), 

1165 _validator, 

1166 base_type=validator.base_type, 

1167 mapper=type_normalizer, 

1168 ) 

1169 

1170 if unmapped_type in BASIC_SIMPLE_TYPES: 

1171 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1172 

1173 type_mapping = self._registered_types.get(provided_type) 

1174 if type_mapping is not None: 

1175 simple_type = f" ({type_name})" 

1176 type_name = type_mapping.target_type.__name__ 

1177 else: 

1178 simple_type = "" 

1179 

1180 def _validator(v: Any, path: AttributePath) -> None: 

1181 if not isinstance(v, unmapped_type): 

1182 _validation_type_error( 

1183 path, f"The attribute must be a {type_name}{simple_type}" 

1184 ) 

1185 

1186 return AttributeTypeHandler( 

1187 type_name, 

1188 _validator, 

1189 base_type=unmapped_type, 

1190 mapper=type_normalizer, 

1191 ) 

1192 if t_orig == list: 

1193 if not t_args: 1193 ↛ 1194line 1193 didn't jump to line 1194, because the condition on line 1193 was never true

1194 raise ValueError( 

1195 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1196 ) 

1197 _, t_provided_orig, t_provided_args = unpack_type( 

1198 provided_type, 

1199 parsing_typed_dict_attribute, 

1200 ) 

1201 genetic_type = t_args[0] 

1202 key_mapper = self._as_type_validator( 

1203 attribute, 

1204 genetic_type, 

1205 parsing_typed_dict_attribute, 

1206 ) 

1207 

1208 def _validator(v: Any, path: AttributePath) -> None: 

1209 if not isinstance(v, list): 1209 ↛ 1210line 1209 didn't jump to line 1210, because the condition on line 1209 was never true

1210 _validation_type_error(path, "The attribute must be a list") 

1211 for i, v in enumerate(v): 

1212 key_mapper.ensure_type(v, path[i]) 

1213 

1214 list_mapper = ( 

1215 map_each_element(key_mapper.mapper) 

1216 if key_mapper.mapper is not None 

1217 else None 

1218 ) 

1219 

1220 return AttributeTypeHandler( 

1221 f"List of {key_mapper.describe_type()}", 

1222 _validator, 

1223 base_type=list, 

1224 mapper=type_normalizer, 

1225 ).combine_mapper(list_mapper) 

1226 if is_typeddict(provided_type): 

1227 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1228 return AttributeTypeHandler( 

1229 description=f"{provided_type.__name__} (Typed Mapping)", 

1230 ensure_type=lambda v, ap: None, 

1231 base_type=dict, 

1232 mapper=lambda v, ap, cv: subparser.parse_input( 

1233 v, ap, parser_context=cv 

1234 ), 

1235 ) 

1236 if t_orig == dict: 

1237 if not t_args or len(t_args) != 2: 1237 ↛ 1238line 1237 didn't jump to line 1238, because the condition on line 1237 was never true

1238 raise ValueError( 

1239 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1240 ) 

1241 if t_args[0] != str: 1241 ↛ 1242line 1241 didn't jump to line 1242, because the condition on line 1241 was never true

1242 raise ValueError( 

1243 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1244 " Currently, only `str` is supported (Dict[str, Y])" 

1245 ) 

1246 key_mapper = self._as_type_validator( 

1247 attribute, 

1248 t_args[0], 

1249 parsing_typed_dict_attribute, 

1250 ) 

1251 value_mapper = self._as_type_validator( 

1252 attribute, 

1253 t_args[1], 

1254 parsing_typed_dict_attribute, 

1255 ) 

1256 

1257 if key_mapper.base_type is None: 1257 ↛ 1258line 1257 didn't jump to line 1258, because the condition on line 1257 was never true

1258 raise ValueError( 

1259 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1260 f" without trivial base types (such as `str`) are not supported at the moment." 

1261 ) 

1262 

1263 if value_mapper.mapper is not None: 1263 ↛ 1264line 1263 didn't jump to line 1264, because the condition on line 1263 was never true

1264 raise ValueError( 

1265 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1266 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1267 " Better typing may come later" 

1268 ) 

1269 

1270 def _validator(uv: Any, path: AttributePath) -> None: 

1271 if not isinstance(uv, dict): 1271 ↛ 1272line 1271 didn't jump to line 1272, because the condition on line 1271 was never true

1272 _validation_type_error(path, "The attribute must be a mapping") 

1273 key_name = "the first key in the mapping" 

1274 for i, (k, v) in enumerate(uv.items()): 

1275 if not key_mapper.base_type_match(k): 1275 ↛ 1276line 1275 didn't jump to line 1276, because the condition on line 1275 was never true

1276 kp = path.copy_with_path_hint(key_name) 

1277 _validation_type_error( 

1278 kp, 

1279 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1280 ) 

1281 key_name = f"the key after {k}" 

1282 value_mapper.ensure_type(v, path[k]) 

1283 

1284 return AttributeTypeHandler( 

1285 f"Mapping of {value_mapper.describe_type()}", 

1286 _validator, 

1287 base_type=dict, 

1288 mapper=type_normalizer, 

1289 ).combine_mapper(key_mapper.mapper) 

1290 if t_orig == Union: 

1291 if _is_two_arg_x_list_x(t_args): 

1292 # Force the order to be "X, List[X]" as it simplifies the code 

1293 x_list_x = ( 

1294 t_args if get_origin(t_args[1]) == list else (t_args[1], t_args[0]) 

1295 ) 

1296 

1297 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1298 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1299 # case to avoid this assert and fall into the "generic case". 

1300 assert get_origin(x_list_x[0]) != list 

1301 x_subtype_checker = self._as_type_validator( 

1302 attribute, 

1303 x_list_x[0], 

1304 parsing_typed_dict_attribute, 

1305 ) 

1306 list_x_subtype_checker = self._as_type_validator( 

1307 attribute, 

1308 x_list_x[1], 

1309 parsing_typed_dict_attribute, 

1310 ) 

1311 type_description = x_subtype_checker.describe_type() 

1312 type_description = f"{type_description} or a list of {type_description}" 

1313 

1314 def _validator(v: Any, path: AttributePath) -> None: 

1315 if isinstance(v, list): 

1316 list_x_subtype_checker.ensure_type(v, path) 

1317 else: 

1318 x_subtype_checker.ensure_type(v, path) 

1319 

1320 return AttributeTypeHandler( 

1321 type_description, 

1322 _validator, 

1323 mapper=type_normalizer, 

1324 ) 

1325 else: 

1326 subtype_checker = [ 

1327 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1328 for a in t_args 

1329 ] 

1330 type_description = "one-of: " + ", ".join( 

1331 f"{sc.describe_type()}" for sc in subtype_checker 

1332 ) 

1333 mapper = subtype_checker[0].mapper 

1334 if any(mapper != sc.mapper for sc in subtype_checker): 1334 ↛ 1335line 1334 didn't jump to line 1335, because the condition on line 1334 was never true

1335 raise ValueError( 

1336 f'Cannot handle the union "{provided_type}" as the target types need different' 

1337 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1338 " where X is a non-collection type." 

1339 ) 

1340 

1341 def _validator(v: Any, path: AttributePath) -> None: 

1342 partial_matches = [] 

1343 for sc in subtype_checker: 1343 ↛ 1351line 1343 didn't jump to line 1351, because the loop on line 1343 didn't complete

1344 try: 

1345 sc.ensure_type(v, path) 

1346 return 

1347 except ManifestParseException as e: 

1348 if sc.base_type_match(v): 1348 ↛ 1349line 1348 didn't jump to line 1349, because the condition on line 1348 was never true

1349 partial_matches.append((sc, e)) 

1350 

1351 if len(partial_matches) == 1: 

1352 raise partial_matches[0][1] 

1353 _validation_type_error( 

1354 path, f"Could not match against: {type_description}" 

1355 ) 

1356 

1357 return AttributeTypeHandler( 

1358 type_description, 

1359 _validator, 

1360 mapper=type_normalizer, 

1361 ) 

1362 if t_orig == Literal: 

1363 # We want "x" for string values; repr provides 'x' 

1364 pretty = ", ".join( 

1365 f'"{v}"' if isinstance(v, str) else str(v) for v in t_args 

1366 ) 

1367 

1368 def _validator(v: Any, path: AttributePath) -> None: 

1369 if v not in t_args: 

1370 value_hint = "" 

1371 if isinstance(v, str): 1371 ↛ 1373line 1371 didn't jump to line 1373, because the condition on line 1371 was never false

1372 value_hint = f"({v}) " 

1373 _validation_type_error( 

1374 path, 

1375 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1376 ) 

1377 

1378 return AttributeTypeHandler( 

1379 f"One of the following literal values: {pretty}", 

1380 _validator, 

1381 ) 

1382 

1383 if provided_type == Any: 1383 ↛ 1388line 1383 didn't jump to line 1388, because the condition on line 1383 was never false

1384 return AttributeTypeHandler( 

1385 "any (unvalidated)", 

1386 lambda *a: None, 

1387 ) 

1388 raise ValueError( 

1389 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1390 ) 

1391 

1392 def _parse_types( 

1393 self, 

1394 spec: Type[TypedDict], 

1395 allow_target_attribute_annotation: bool = False, 

1396 allow_source_attribute_annotations: bool = False, 

1397 forbid_optional: bool = True, 

1398 ) -> Dict[str, AttributeDescription]: 

1399 annotations = get_type_hints(spec, include_extras=True) 

1400 return { 

1401 k: self._attribute_description( 

1402 k, 

1403 t, 

1404 k in spec.__required_keys__, 

1405 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1406 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1407 forbid_optional=forbid_optional, 

1408 ) 

1409 for k, t in annotations.items() 

1410 } 

1411 

1412 def _attribute_description( 

1413 self, 

1414 attribute: str, 

1415 orig_td: Any, 

1416 is_required: bool, 

1417 forbid_optional: bool = True, 

1418 allow_target_attribute_annotation: bool = False, 

1419 allow_source_attribute_annotations: bool = False, 

1420 ) -> AttributeDescription: 

1421 td, anno, is_optional = _parse_type( 

1422 attribute, orig_td, forbid_optional=forbid_optional 

1423 ) 

1424 type_validator = self._as_type_validator(attribute, td, True) 

1425 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1426 anno, 

1427 f' Seen with attribute "{attribute}".', 

1428 attribute, 

1429 is_required, 

1430 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1431 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1432 ) 

1433 return AttributeDescription( 

1434 target_attribute=parsed_annotations.target_attribute, 

1435 attribute_type=td, 

1436 type_validator=type_validator, 

1437 annotations=anno, 

1438 is_optional=is_optional, 

1439 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1440 conditional_required=parsed_annotations.conditional_required, 

1441 source_attribute_name=assume_not_none( 

1442 parsed_annotations.source_manifest_attribute 

1443 ), 

1444 parse_hints=parsed_annotations, 

1445 ) 

1446 

1447 def _parse_alt_form( 

1448 self, 

1449 alt_form, 

1450 default_target_attribute: Optional[str], 

1451 ) -> AttributeDescription: 

1452 td, anno, is_optional = _parse_type( 

1453 "source_format alternative form", 

1454 alt_form, 

1455 forbid_optional=True, 

1456 parsing_typed_dict_attribute=False, 

1457 ) 

1458 type_validator = self._as_type_validator( 

1459 "source_format alternative form", 

1460 td, 

1461 True, 

1462 ) 

1463 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1464 anno, 

1465 " The alternative for source_format.", 

1466 None, 

1467 False, 

1468 default_target_attribute=default_target_attribute, 

1469 allow_target_attribute_annotation=True, 

1470 allow_source_attribute_annotations=False, 

1471 ) 

1472 return AttributeDescription( 

1473 target_attribute=parsed_annotations.target_attribute, 

1474 attribute_type=td, 

1475 type_validator=type_validator, 

1476 annotations=anno, 

1477 is_optional=is_optional, 

1478 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1479 conditional_required=parsed_annotations.conditional_required, 

1480 source_attribute_name="Alt form of the source_format", 

1481 ) 

1482 

1483 def _union_narrowing( 

1484 self, 

1485 input_type: Any, 

1486 target_type: Any, 

1487 parsing_typed_dict_attribute: bool, 

1488 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

1489 _, input_orig, input_args = unpack_type( 

1490 input_type, parsing_typed_dict_attribute 

1491 ) 

1492 _, target_orig, target_args = unpack_type( 

1493 target_type, parsing_typed_dict_attribute 

1494 ) 

1495 

1496 if input_orig != Union or not input_args: 1496 ↛ 1497line 1496 didn't jump to line 1497, because the condition on line 1496 was never true

1497 raise ValueError("input_type must be a Union[...] with non-empty args") 

1498 

1499 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1500 # - Where X = Y or there is a simple standard transformation from X to Y. 

1501 

1502 if target_orig not in (Union, list) or not target_args: 

1503 # Not supported 

1504 return None 

1505 

1506 if target_orig == Union and set(input_args) == set(target_args): 1506 ↛ 1508line 1506 didn't jump to line 1508, because the condition on line 1506 was never true

1507 # Not needed (identity mapping) 

1508 return None 

1509 

1510 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1510 ↛ exit,   1510 ↛ 15122 missed branches: 1) line 1510 didn't finish the generator expression on line 1510, 2) line 1510 didn't jump to line 1512, because the condition on line 1510 was never true

1511 # Not supported 

1512 return None 

1513 

1514 target_arg = target_args[0] 

1515 simplified_type = self._strip_mapped_types( 

1516 target_arg, parsing_typed_dict_attribute 

1517 ) 

1518 acceptable_types = { 

1519 target_arg, 

1520 List[target_arg], # type: ignore 

1521 simplified_type, 

1522 List[simplified_type], # type: ignore 

1523 } 

1524 target_format = ( 

1525 target_arg, 

1526 List[target_arg], # type: ignore 

1527 ) 

1528 in_target_format = 0 

1529 in_simple_format = 0 

1530 for input_arg in input_args: 

1531 if input_arg not in acceptable_types: 1531 ↛ 1533line 1531 didn't jump to line 1533, because the condition on line 1531 was never true

1532 # Not supported 

1533 return None 

1534 if input_arg in target_format: 

1535 in_target_format += 1 

1536 else: 

1537 in_simple_format += 1 

1538 

1539 assert in_simple_format or in_target_format 

1540 

1541 if in_target_format and not in_simple_format: 

1542 # Union[X, List[X]] -> List[X] 

1543 return normalize_into_list 

1544 mapped = self._registered_types[target_arg] 

1545 if not in_target_format and in_simple_format: 1545 ↛ 1560line 1545 didn't jump to line 1560, because the condition on line 1545 was never false

1546 # Union[X, List[X]] -> List[Y] 

1547 

1548 def _mapper_x_list_y( 

1549 x: Union[Any, List[Any]], 

1550 ap: AttributePath, 

1551 pc: Optional["ParserContextData"], 

1552 ) -> List[Any]: 

1553 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

1554 

1555 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1556 

1557 return _mapper_x_list_y 

1558 

1559 # Union[Y, List[X]] -> List[Y] 

1560 if not isinstance(target_arg, type): 

1561 raise ValueError( 

1562 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1563 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1564 f" in the source content (but both a mix of both)" 

1565 ) 

1566 

1567 def _mapper_mixed_list_y( 

1568 x: Union[Any, List[Any]], 

1569 ap: AttributePath, 

1570 pc: Optional["ParserContextData"], 

1571 ) -> List[Any]: 

1572 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

1573 

1574 return [ 

1575 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1576 for x in in_list_form 

1577 ] 

1578 

1579 return _mapper_mixed_list_y 

1580 

1581 def _type_normalize( 

1582 self, 

1583 attribute: str, 

1584 input_type: Any, 

1585 target_type: Any, 

1586 parsing_typed_dict_attribute: bool, 

1587 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

1588 if input_type == target_type: 

1589 return None 

1590 _, input_orig, input_args = unpack_type( 

1591 input_type, parsing_typed_dict_attribute 

1592 ) 

1593 _, target_orig, target_args = unpack_type( 

1594 target_type, 

1595 parsing_typed_dict_attribute, 

1596 ) 

1597 if input_orig == Union: 

1598 result = self._union_narrowing( 

1599 input_type, target_type, parsing_typed_dict_attribute 

1600 ) 

1601 if result: 

1602 return result 

1603 elif target_orig == list and target_args[0] == input_type: 

1604 return wrap_into_list 

1605 

1606 mapped = self._registered_types.get(target_type) 

1607 if mapped is not None and input_type == mapped.source_type: 

1608 # Source -> Target 

1609 return mapped.mapper 

1610 if target_orig == list and target_args: 1610 ↛ 1628line 1610 didn't jump to line 1628, because the condition on line 1610 was never false

1611 mapped = self._registered_types.get(target_args[0]) 

1612 if mapped is not None: 1612 ↛ 1628line 1612 didn't jump to line 1628, because the condition on line 1612 was never false

1613 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1614 mapped_type: TypeMapping = mapped 

1615 if input_type == mapped.source_type: 1615 ↛ 1617line 1615 didn't jump to line 1617, because the condition on line 1615 was never true

1616 # Source -> List[Target] 

1617 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1618 if ( 1618 ↛ 1628line 1618 didn't jump to line 1628

1619 input_orig == list 

1620 and input_args 

1621 and input_args[0] == mapped_type.source_type 

1622 ): 

1623 # List[Source] -> List[Target] 

1624 return lambda xs, ap, pc: [ 

1625 mapped_type.mapper(x, ap, pc) for x in xs 

1626 ] 

1627 

1628 raise ValueError( 

1629 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1630 f" {input_type} to {target_type}" 

1631 ) 

1632 

1633 def _strip_mapped_types( 

1634 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1635 ) -> Any: 

1636 m = self._registered_types.get(orig_td) 

1637 if m is not None: 

1638 return m.source_type 

1639 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1640 if v == list: 

1641 arg = args[0] 

1642 m = self._registered_types.get(arg) 

1643 if m: 

1644 return List[m.source_type] # type: ignore 

1645 if v == Union: 

1646 stripped_args = tuple( 

1647 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1648 ) 

1649 if stripped_args != args: 

1650 return Union[stripped_args] 

1651 return orig_td 

1652 

1653 

1654def _verify_inline_reference_documentation( 

1655 source_content_attributes: Mapping[str, AttributeDescription], 

1656 inline_reference_documentation: Optional[ParserDocumentation], 

1657 has_alt_form: bool, 

1658) -> None: 

1659 if inline_reference_documentation is None: 

1660 return 

1661 attribute_doc = inline_reference_documentation.attribute_doc 

1662 if attribute_doc: 

1663 seen = set() 

1664 for attr_doc in attribute_doc: 

1665 for attr_name in attr_doc.attributes: 

1666 attr = source_content_attributes.get(attr_name) 

1667 if attr is None: 1667 ↛ 1668line 1667 didn't jump to line 1668, because the condition on line 1667 was never true

1668 raise ValueError( 

1669 f'The inline_reference_documentation references an attribute "{attr_name}", which does not' 

1670 f" exist in the source format." 

1671 ) 

1672 if attr_name in seen: 1672 ↛ 1673line 1672 didn't jump to line 1673, because the condition on line 1672 was never true

1673 raise ValueError( 

1674 f'The inline_reference_documentation has documentation for "{attr_name}" twice,' 

1675 f" which is not supported. Please document it at most once" 

1676 ) 

1677 seen.add(attr_name) 

1678 

1679 undocumented = source_content_attributes.keys() - seen 

1680 if undocumented: 1680 ↛ 1681line 1680 didn't jump to line 1681, because the condition on line 1680 was never true

1681 undocumented_attrs = ", ".join(undocumented) 

1682 raise ValueError( 

1683 "The following attributes were not documented. If this is deliberate, then please" 

1684 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1685 f" {undocumented_attrs}" 

1686 ) 

1687 

1688 if inline_reference_documentation.alt_parser_description and not has_alt_form: 1688 ↛ 1689line 1688 didn't jump to line 1689, because the condition on line 1688 was never true

1689 raise ValueError( 

1690 "The inline_reference_documentation had documentation for an non-mapping format," 

1691 " but the source format does not have a non-mapping format." 

1692 ) 

1693 

1694 

1695def _check_conflicts( 

1696 input_content_attributes: Dict[str, AttributeDescription], 

1697 required_attributes: FrozenSet[str], 

1698 all_attributes: FrozenSet[str], 

1699) -> None: 

1700 for attr_name, attr in input_content_attributes.items(): 

1701 if attr_name in required_attributes and attr.conflicting_attributes: 1701 ↛ 1702line 1701 didn't jump to line 1702, because the condition on line 1701 was never true

1702 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1703 raise ValueError( 

1704 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1705 " This makes it impossible to use these attributes. Either remove the attributes" 

1706 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1707 " optional (NotRequired)" 

1708 ) 

1709 else: 

1710 required_conflicts = attr.conflicting_attributes & required_attributes 

1711 if required_conflicts: 1711 ↛ 1712line 1711 didn't jump to line 1712, because the condition on line 1711 was never true

1712 c = ", ".join(repr(a) for a in required_conflicts) 

1713 raise ValueError( 

1714 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1715 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1716 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1717 ) 

1718 unknown_attributes = attr.conflicting_attributes - all_attributes 

1719 if unknown_attributes: 1719 ↛ 1720line 1719 didn't jump to line 1720, because the condition on line 1719 was never true

1720 c = ", ".join(repr(a) for a in unknown_attributes) 

1721 raise ValueError( 

1722 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1723 f" None of these attributes were declared in the input." 

1724 ) 

1725 

1726 

1727def _check_attributes( 

1728 content: Type[TypedDict], 

1729 input_content: Type[TypedDict], 

1730 input_content_attributes: Dict[str, AttributeDescription], 

1731 sources: Mapping[str, Collection[str]], 

1732) -> None: 

1733 target_required_keys = content.__required_keys__ 

1734 input_required_keys = input_content.__required_keys__ 

1735 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1736 

1737 for input_name in all_input_keys: 

1738 attr = input_content_attributes[input_name] 

1739 target_name = attr.target_attribute 

1740 source_names = sources[target_name] 

1741 input_is_required = input_name in input_required_keys 

1742 target_is_required = target_name in target_required_keys 

1743 

1744 assert source_names 

1745 

1746 if input_is_required and len(source_names) > 1: 1746 ↛ 1747line 1746 didn't jump to line 1747, because the condition on line 1746 was never true

1747 raise ValueError( 

1748 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1749 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1750 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1751 f' "{input_name}" might be missing a NotRequired type' 

1752 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1753 ) 

1754 if not input_is_required and target_is_required and len(source_names) == 1: 1754 ↛ 1755line 1754 didn't jump to line 1755, because the condition on line 1754 was never true

1755 raise ValueError( 

1756 f'The source attribute "{input_name}" is not marked as required and maps to' 

1757 f' "{target_name}", which is marked as required. As there are no other attributes' 

1758 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1759 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1760 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1761 ) 

1762 

1763 

1764def _validation_type_error(path: AttributePath, message: str) -> None: 

1765 raise ManifestParseException( 

1766 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1767 ) 

1768 

1769 

1770def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool: 

1771 if len(t_args) != 2: 1771 ↛ 1772line 1771 didn't jump to line 1772, because the condition on line 1771 was never true

1772 return False 

1773 lhs, rhs = t_args 

1774 if get_origin(lhs) == list: 

1775 if get_origin(rhs) == list: 1775 ↛ 1778line 1775 didn't jump to line 1778, because the condition on line 1775 was never true

1776 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1777 # does not support it. 

1778 return False 

1779 l_args = get_args(lhs) 

1780 return bool(l_args and l_args[0] == rhs) 

1781 if get_origin(rhs) == list: 

1782 r_args = get_args(rhs) 

1783 return bool(r_args and r_args[0] == lhs) 

1784 return False 

1785 

1786 

1787def _extract_typed_dict( 

1788 base_type, 

1789 default_target_attribute: Optional[str], 

1790) -> Tuple[Optional[Type[TypedDict]], Any]: 

1791 if is_typeddict(base_type): 

1792 return base_type, None 

1793 _, origin, args = unpack_type(base_type, False) 

1794 if origin != Union: 

1795 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1795 ↛ 1796line 1795 didn't jump to line 1796, because the condition on line 1795 was never true

1796 raise ValueError( 

1797 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1798 ) 

1799 return None, base_type 

1800 typed_dicts = [x for x in args if is_typeddict(x)] 

1801 if len(typed_dicts) > 1: 1801 ↛ 1802line 1801 didn't jump to line 1802, because the condition on line 1801 was never true

1802 raise ValueError( 

1803 "When source_format is a Union, it must contain at most one TypedDict" 

1804 ) 

1805 typed_dict = typed_dicts[0] if typed_dicts else None 

1806 

1807 if any(x is None or x is _NONE_TYPE for x in args): 1807 ↛ 1808line 1807 didn't jump to line 1808, because the condition on line 1807 was never true

1808 raise ValueError( 

1809 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1810 ) 

1811 

1812 if any( 1812 ↛ 1817line 1812 didn't jump to line 1817, because the condition on line 1812 was never true

1813 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1814 for x in args 

1815 if x is not typed_dict 

1816 ): 

1817 raise ValueError( 

1818 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1819 ) 

1820 remaining = [x for x in args if x is not typed_dict] 

1821 has_target_attribute = False 

1822 anno = None 

1823 if len(remaining) == 1: 1823 ↛ 1824line 1823 didn't jump to line 1824, because the condition on line 1823 was never true

1824 base_type, anno, _ = _parse_type( 

1825 "source_format alternative form", 

1826 remaining[0], 

1827 forbid_optional=True, 

1828 parsing_typed_dict_attribute=False, 

1829 ) 

1830 has_target_attribute = bool(anno) and any( 

1831 isinstance(x, TargetAttribute) for x in anno 

1832 ) 

1833 target_type = base_type 

1834 else: 

1835 target_type = Union[tuple(remaining)] 

1836 

1837 if default_target_attribute is None and not has_target_attribute: 1837 ↛ 1838line 1837 didn't jump to line 1838, because the condition on line 1837 was never true

1838 raise ValueError( 

1839 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1840 " OR the parsed_content format must have exactly one attribute that is required." 

1841 ) 

1842 if anno: 1842 ↛ 1843line 1842 didn't jump to line 1843, because the condition on line 1842 was never true

1843 final_anno = [target_type] 

1844 final_anno.extend(anno) 

1845 return typed_dict, Annotated[tuple(final_anno)] 

1846 return typed_dict, target_type 

1847 

1848 

1849def _dispatch_parse_generator( 

1850 dispatch_type: Type[DebputyDispatchableType], 

1851) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1852 def _dispatch_parse( 

1853 value: Any, 

1854 attribute_path: AttributePath, 

1855 parser_context: Optional["ParserContextData"], 

1856 ): 

1857 assert parser_context is not None 

1858 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1859 return dispatching_parser.parse_input( 

1860 value, attribute_path, parser_context=parser_context 

1861 ) 

1862 

1863 return _dispatch_parse 

1864 

1865 

1866def _dispatch_parser( 

1867 dispatch_type: Type[DebputyDispatchableType], 

1868) -> AttributeTypeHandler: 

1869 return AttributeTypeHandler( 

1870 dispatch_type.__name__, 

1871 lambda *a: None, 

1872 mapper=_dispatch_parse_generator(dispatch_type), 

1873 ) 

1874 

1875 

1876def _parse_type( 

1877 attribute: str, 

1878 orig_td: Any, 

1879 forbid_optional: bool = True, 

1880 parsing_typed_dict_attribute: bool = True, 

1881) -> Tuple[Any, Tuple[Any, ...], bool]: 

1882 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1883 md: Tuple[Any, ...] = tuple() 

1884 optional = False 

1885 if v is not None: 

1886 if v == Annotated: 

1887 anno = get_args(td) 

1888 md = anno[1:] 

1889 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1890 

1891 if td is _NONE_TYPE: 1891 ↛ 1892line 1891 didn't jump to line 1892, because the condition on line 1891 was never true

1892 raise ValueError( 

1893 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1894 " debputy manifest, so this attribute does not make sense in its current form." 

1895 ) 

1896 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1896 ↛ 1897line 1896 didn't jump to line 1897, because the condition on line 1896 was never true

1897 raise ValueError( 

1898 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1899 " Please use NotRequired for optional fields" 

1900 ) 

1901 

1902 return td, md, optional 

1903 

1904 

1905def _normalize_attribute_name(attribute: str) -> str: 

1906 if attribute.endswith("_"): 

1907 attribute = attribute[:-1] 

1908 return attribute.replace("_", "-") 

1909 

1910 

1911@dataclasses.dataclass 

1912class DetectedDebputyParseHint: 

1913 target_attribute: str 

1914 source_manifest_attribute: Optional[str] 

1915 conflict_with_source_attributes: FrozenSet[str] 

1916 conditional_required: Optional[ConditionalRequired] 

1917 applicable_as_path_hint: bool 

1918 

1919 @classmethod 

1920 def parse_annotations( 

1921 cls, 

1922 anno: Tuple[Any, ...], 

1923 error_context: str, 

1924 default_attribute_name: Optional[str], 

1925 is_required: bool, 

1926 default_target_attribute: Optional[str] = None, 

1927 allow_target_attribute_annotation: bool = False, 

1928 allow_source_attribute_annotations: bool = False, 

1929 ) -> "DetectedDebputyParseHint": 

1930 target_attr_anno = find_annotation(anno, TargetAttribute) 

1931 if target_attr_anno: 

1932 if not allow_target_attribute_annotation: 1932 ↛ 1933line 1932 didn't jump to line 1933, because the condition on line 1932 was never true

1933 raise ValueError( 

1934 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1935 ) 

1936 target_attribute = target_attr_anno.attribute 

1937 elif default_target_attribute is not None: 

1938 target_attribute = default_target_attribute 

1939 elif default_attribute_name is not None: 1939 ↛ 1942line 1939 didn't jump to line 1942, because the condition on line 1939 was never false

1940 target_attribute = default_attribute_name 

1941 else: 

1942 if default_attribute_name is None: 

1943 raise ValueError( 

1944 "allow_target_attribute_annotation must be True OR " 

1945 "default_attribute_name/default_target_attribute must be not None" 

1946 ) 

1947 raise ValueError( 

1948 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1949 ) 

1950 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1951 _source_attribute_allowed( 

1952 allow_source_attribute_annotations, error_context, source_attribute_anno 

1953 ) 

1954 if source_attribute_anno: 

1955 source_attribute_name = source_attribute_anno.attribute 

1956 elif default_attribute_name is not None: 

1957 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1958 else: 

1959 source_attribute_name = None 

1960 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1961 if mutual_exclusive_with_anno: 

1962 _source_attribute_allowed( 

1963 allow_source_attribute_annotations, 

1964 error_context, 

1965 mutual_exclusive_with_anno, 

1966 ) 

1967 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1968 else: 

1969 conflicting_attributes = frozenset() 

1970 conditional_required = find_annotation(anno, ConditionalRequired) 

1971 

1972 if conditional_required and is_required: 1972 ↛ 1973line 1972 didn't jump to line 1973, because the condition on line 1972 was never true

1973 if default_attribute_name is None: 

1974 raise ValueError( 

1975 f"is_required cannot be True without default_attribute_name being not None" 

1976 ) 

1977 raise ValueError( 

1978 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1979 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1980 ) 

1981 

1982 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1983 applicable_as_path_hint = not_path_hint_anno is None 

1984 

1985 return DetectedDebputyParseHint( 

1986 target_attribute=target_attribute, 

1987 source_manifest_attribute=source_attribute_name, 

1988 conflict_with_source_attributes=conflicting_attributes, 

1989 conditional_required=conditional_required, 

1990 applicable_as_path_hint=applicable_as_path_hint, 

1991 ) 

1992 

1993 

1994def _source_attribute_allowed( 

1995 source_attribute_allowed: bool, 

1996 error_context: str, 

1997 annotation: Optional[DebputyParseHint], 

1998) -> None: 

1999 if source_attribute_allowed or annotation is None: 1999 ↛ 2001line 1999 didn't jump to line 2001, because the condition on line 1999 was never false

2000 return 

2001 raise ValueError( 

2002 f'The annotation "{annotation}" cannot be used here. {error_context}' 

2003 )