import collections import dataclasses import itertools from typing import ( Any, Callable, Tuple, TypedDict, Dict, get_type_hints, Annotated, get_args, get_origin, TypeVar, Generic, FrozenSet, Mapping, Optional, cast, is_typeddict, Type, Union, List, Collection, NotRequired, Iterable, Literal, Sequence, ) from debputy.manifest_parser.base_types import ( DebputyParsedContent, StaticFileSystemOwner, StaticFileSystemGroup, FileSystemMode, OctalMode, SymlinkTarget, FileSystemMatchRule, FileSystemExactMatchRule, FileSystemExactNonDirMatchRule, DebputyDispatchableType, TypeMapping, ) from debputy.manifest_parser.exceptions import ( ManifestParseException, ) from debputy.manifest_parser.mapper_code import ( type_mapper_str2package, normalize_into_list, wrap_into_list, map_each_element, ) from debputy.manifest_parser.parser_data import ParserContextData from debputy.manifest_parser.util import AttributePath, unpack_type, find_annotation from debputy.packages import BinaryPackage from debputy.plugin.api.impl_types import ( DeclarativeInputParser, TD, _ALL_PACKAGE_TYPES, resolve_package_type_selectors, ) from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector from debputy.util import _info, _warn, assume_not_none try: from Levenshtein import distance except ImportError: _WARN_ONCE = False def _detect_possible_typo( _key: str, _value: object, _manifest_attributes: Mapping[str, "AttributeDescription"], _path: "AttributePath", ) -> None: global _WARN_ONCE if not _WARN_ONCE: _WARN_ONCE = True _info( "Install python3-levenshtein to have debputy try to detect typos in the manifest." ) else: def _detect_possible_typo( key: str, value: object, manifest_attributes: Mapping[str, "AttributeDescription"], path: "AttributePath", ) -> None: k_len = len(key) key_path = path[key] matches: List[str] = [] current_match_strength = 0 for acceptable_key, attr in manifest_attributes.items(): if abs(k_len - len(acceptable_key)) > 2: continue d = distance(key, acceptable_key) if d > 2: continue try: attr.type_validator.ensure_type(value, key_path) except ManifestParseException: if attr.type_validator.base_type_match(value): match_strength = 1 else: match_strength = 0 else: match_strength = 2 if match_strength < current_match_strength: continue if match_strength > current_match_strength: current_match_strength = match_strength matches.clear() matches.append(acceptable_key) if not matches: return ref = f'at "{path.path}"' if path else "at the manifest root level" if len(matches) == 1: possible_match = repr(matches[0]) _warn( f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' ) else: matches.sort() possible_matches = ", ".join(repr(a) for a in matches) _warn( f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' ) SF = TypeVar("SF") T = TypeVar("T") S = TypeVar("S") _NONE_TYPE = type(None) # These must be able to appear in an "isinstance" check and must be builtin types. BASIC_SIMPLE_TYPES = { str: "string", int: "integer", bool: "boolean", } class AttributeTypeHandler: __slots__ = ("_description", "_ensure_type", "base_type", "mapper") def __init__( self, description: str, ensure_type: Callable[[Any, AttributePath], None], *, base_type: Optional[Type[Any]] = None, mapper: Optional[ Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] ] = None, ) -> None: self._description = description self._ensure_type = ensure_type self.base_type = base_type self.mapper = mapper def describe_type(self) -> str: return self._description def ensure_type(self, obj: object, path: AttributePath) -> None: self._ensure_type(obj, path) def base_type_match(self, obj: object) -> bool: base_type = self.base_type return base_type is not None and isinstance(obj, base_type) def map_type( self, value: Any, path: AttributePath, parser_context: Optional["ParserContextData"], ) -> Any: mapper = self.mapper if mapper is not None: return mapper(value, path, parser_context) return value def combine_mapper( self, mapper: Optional[ Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] ], ) -> "AttributeTypeHandler": if mapper is None: return self if self.mapper is not None: m = self.mapper def _combined_mapper( value: Any, path: AttributePath, parser_context: Optional["ParserContextData"], ) -> Any: return mapper(m(value, path, parser_context), path, parser_context) else: _combined_mapper = mapper return AttributeTypeHandler( self._description, self._ensure_type, base_type=self.base_type, mapper=_combined_mapper, ) @dataclasses.dataclass(slots=True) class AttributeDescription: source_attribute_name: str target_attribute: str attribute_type: Any type_validator: AttributeTypeHandler annotations: Tuple[Any, ...] conflicting_attributes: FrozenSet[str] conditional_required: Optional["ConditionalRequired"] parse_hints: Optional["DetectedDebputyParseHint"] = None is_optional: bool = False def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: if attribute_path.path_hint is not None: return True if isinstance(v, str): attribute_path.path_hint = v return True elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): attribute_path.path_hint = v[0] return True return False @dataclasses.dataclass(slots=True, frozen=True) class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): alt_form_parser: AttributeDescription inline_reference_documentation: Optional[ParserDocumentation] = None def parse_input( self, value: object, path: AttributePath, *, parser_context: Optional["ParserContextData"] = None, ) -> TD: if self.reference_documentation_url is not None: doc_ref = f" (Documentation: {self.reference_documentation_url})" else: doc_ref = "" alt_form_parser = self.alt_form_parser if value is None: form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" if self.reference_documentation_url is not None: doc_ref = f" Please see {self.reference_documentation_url} for the documentation." raise ManifestParseException( f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" ) _extract_path_hint(value, path) alt_form_parser.type_validator.ensure_type(value, path) attribute = alt_form_parser.target_attribute alias_mapping = { attribute: ("", None), } v = alt_form_parser.type_validator.map_type(value, path, parser_context) path.alias_mapping = alias_mapping return cast("TD", {attribute: v}) @dataclasses.dataclass(slots=True) class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): input_time_required_parameters: FrozenSet[str] all_parameters: FrozenSet[str] manifest_attributes: Mapping[str, "AttributeDescription"] source_attributes: Mapping[str, "AttributeDescription"] at_least_one_of: FrozenSet[FrozenSet[str]] alt_form_parser: Optional[AttributeDescription] mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset() _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None inline_reference_documentation: Optional[ParserDocumentation] = None path_hint_source_attributes: Sequence[str] = tuple() def parse_input( self, value: object, path: AttributePath, *, parser_context: Optional["ParserContextData"] = None, ) -> TD: if self.reference_documentation_url is not None: doc_ref = f" (Documentation: {self.reference_documentation_url})" else: doc_ref = "" if value is None: form_note = " The attribute must be a mapping." if self.alt_form_parser is not None: form_note = ( " The attribute can be a mapping or a non-mapping format" ' (usually, "non-mapping format" means a string or a list of strings).' ) if self.reference_documentation_url is not None: doc_ref = f" Please see {self.reference_documentation_url} for the documentation." raise ManifestParseException( f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" ) if not isinstance(value, dict): alt_form_parser = self.alt_form_parser if alt_form_parser is None: raise ManifestParseException( f"The attribute {path.path} must be a mapping.{doc_ref}" ) _extract_path_hint(value, path) alt_form_parser.type_validator.ensure_type(value, path) assert ( value is not None ), "The alternative form was None, but the parser should have rejected None earlier." attribute = alt_form_parser.target_attribute alias_mapping = { attribute: ("", None), } v = alt_form_parser.type_validator.map_type(value, path, parser_context) path.alias_mapping = alias_mapping return cast("TD", {attribute: v}) unknown_keys = value.keys() - self.all_parameters if unknown_keys: for k in unknown_keys: if isinstance(k, str): _detect_possible_typo(k, value[k], self.manifest_attributes, path) unused_keys = self.all_parameters - value.keys() if unused_keys: k = ", ".join(unused_keys) raise ManifestParseException( f'Unknown keys "{unknown_keys}" at {path.path}". Keys that could be used here are: {k}.{doc_ref}' ) raise ManifestParseException( f'Unknown keys "{unknown_keys}" at {path.path}". Please remove them.{doc_ref}' ) missing_keys = self.input_time_required_parameters - value.keys() if missing_keys: required = ", ".join(repr(k) for k in sorted(missing_keys)) raise ManifestParseException( f"The following keys were required but not present at {path.path}: {required}{doc_ref}" ) for maybe_required in self.all_parameters - value.keys(): attr = self.manifest_attributes[maybe_required] assert attr.conditional_required is None or parser_context is not None if ( attr.conditional_required is not None and attr.conditional_required.condition_applies( assume_not_none(parser_context) ) ): reason = attr.conditional_required.reason raise ManifestParseException( f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path}. {reason}{doc_ref}' ) for keyset in self.at_least_one_of: matched_keys = value.keys() & keyset if not matched_keys: conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) raise ManifestParseException( f"At least one of the following keys must be present at {path.path}:" f" {conditionally_required}{doc_ref}" ) for group in self.mutually_exclusive_attributes: matched = value.keys() & group if len(matched) > 1: ck = ", ".join(repr(k) for k in sorted(matched)) raise ManifestParseException( f"Could not parse {path.path}: The following attributes are" f" mutually exclusive: {ck}{doc_ref}" ) result = {} per_attribute_conflicts = self._per_attribute_conflicts() alias_mapping = {} for path_hint_source_attributes in self.path_hint_source_attributes: v = value.get(path_hint_source_attributes) if v is not None and _extract_path_hint(v, path): break for k, v in value.items(): attr = self.manifest_attributes[k] matched = value.keys() & per_attribute_conflicts[k] if matched: ck = ", ".join(repr(k) for k in sorted(matched)) raise ManifestParseException( f'The attribute "{k}" at {path.path} cannot be used with the following' f" attributes: {ck}{doc_ref}" ) nk = attr.target_attribute key_path = path[k] attr.type_validator.ensure_type(v, key_path) if v is None: continue if k != nk: alias_mapping[nk] = k, None v = attr.type_validator.map_type(v, key_path, parser_context) result[nk] = v if alias_mapping: path.alias_mapping = alias_mapping return cast("TD", result) def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: conflicts = self._per_attribute_conflicts_cache if conflicts is not None: return conflicts attrs = self.source_attributes conflicts = { a.source_attribute_name: frozenset( attrs[ca].source_attribute_name for ca in a.conflicting_attributes ) for a in attrs.values() } self._per_attribute_conflicts_cache = conflicts return self._per_attribute_conflicts_cache class DebputyParseHint: @classmethod def target_attribute(cls, target_attribute: str) -> "DebputyParseHint": """Define this source attribute to have a different target attribute name As an example: >>> class SourceType(TypedDict): ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] ... sources: NotRequired[List[str]] >>> class TargetType(TypedDict): ... sources: List[str] >>> pg = ParserGenerator() >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) In this example, the user can provide either `source` or `sources` and the parser will map them to the `sources` attribute in the `TargetType`. Note this example relies on the builtin mapping of `str` to `List[str]` to align the types between `source` (from SourceType) and `sources` (from TargetType). The following rules apply: * All source attributes that map to the same target attribute will be mutually exclusive (that is, the user cannot give `source` *and* `sources` as input). * When the target attribute is required, the source attributes are conditionally mandatory requiring the user to provide exactly one of them. * When multiple source attributes point to a single target attribute, none of the source attributes can be Required. * The annotation can only be used for the source type specification and the source type specification must be different from the target type specification. The `target_attribute` annotation can be used without having multiple source attributes. This can be useful if the source attribute name is not valid as a python variable identifier to rename it to a valid python identifier. :param target_attribute: The attribute name in the target content :return: The annotation. """ return TargetAttribute(target_attribute) @classmethod def conflicts_with_source_attributes( cls, *conflicting_source_attributes: str, ) -> "DebputyParseHint": """Declare a conflict with one or more source attributes Example: >>> class SourceType(TypedDict): ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] ... sources: NotRequired[List[str]] ... into_dir: NotRequired[str] ... renamed_to: Annotated[ ... NotRequired[str], ... DebputyParseHint.conflicts_with_source_attributes("sources", "into_dir") ... ] >>> class TargetType(TypedDict): ... sources: List[str] ... into_dir: NotRequired[str] ... renamed_to: NotRequired[str] >>> pg = ParserGenerator() >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for the input source. That is, it is irrelevant that `sources` and `sourceĀ“ happens to "map" to the same target attribute. The following rules apply: * It is not possible for a target attribute to declare conflicts unless the target type spec is reused as source type spec. * All attributes involved in a conflict must be NotRequired. If any of the attributes are Required, then the parser generator will reject the input. * All attributes listed in the conflict must be valid attributes in the source type spec. Note you do not have to specify conflicts between two attributes with the same target attribute name. The `target_attribute` annotation will handle that for you. :param conflicting_source_attributes: All source attributes that cannot be used with this attribute. :return: The annotation. """ if len(conflicting_source_attributes) < 1: raise ValueError( "DebputyParseHint.conflicts_with_source_attributes requires at least one attribute as input" ) return ConflictWithSourceAttribute(frozenset(conflicting_source_attributes)) @classmethod def required_when_single_binary( cls, *, package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, ) -> "DebputyParseHint": """Declare a source attribute as required when the source package produces exactly one binary package The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition can only be used for source attributes. """ resolved_package_types = resolve_package_type_selectors(package_type) reason = "The field is required for source packages producing exactly one binary package" if resolved_package_types != _ALL_PACKAGE_TYPES: types = ", ".join(sorted(resolved_package_types)) reason += f" of type {types}" return ConditionalRequired( reason, lambda c: len( [ p for p in c.binary_packages.values() if p.package_type in package_type ] ) == 1, ) return ConditionalRequired( reason, lambda c: c.is_single_binary_package, ) @classmethod def required_when_multi_binary( cls, *, package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, ) -> "DebputyParseHint": """Declare a source attribute as required when the source package produces two or more binary package The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition can only be used for source attributes. """ resolved_package_types = resolve_package_type_selectors(package_type) reason = "The field is required for source packages producing two or more binary packages" if resolved_package_types != _ALL_PACKAGE_TYPES: types = ", ".join(sorted(resolved_package_types)) reason = ( "The field is required for source packages producing not producing exactly one binary packages" f" of type {types}" ) return ConditionalRequired( reason, lambda c: len( [ p for p in c.binary_packages.values() if p.package_type in package_type ] ) != 1, ) return ConditionalRequired( reason, lambda c: not c.is_single_binary_package, ) @classmethod def manifest_attribute(cls, attribute: str) -> "DebputyParseHint": """Declare what the attribute name (as written in the manifest) should be By default, debputy will do an attribute normalizing that will take valid python identifiers such as `dest_dir` and remap it to the manifest variant (such as `dest-dir`) automatically. If you have a special case, where this built-in normalization is insufficient or the python name is considerably different from what the user would write in the manifest, you can use this parse hint to set the name that the user would have to write in the manifest for this attribute. >>> class SourceType(TypedDict): ... source: List[FileSystemMatchRule] ... # Use "as" in the manifest because "as_" was not pretty enough ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.manifest_attribute("as")] In this example, we use the parse hint to use "as" as the name in the manifest, because we cannot use "as" a valid python identifier (it is a keyword). While debputy would map `as_` to `as` for us, we have chosen to use `install_as` as a python identifier. """ return ManifestAttribute(attribute) @classmethod def not_path_error_hint(cls) -> "DebputyParseHint": """Mark this attribute as not a "path hint" when it comes to reporting errors By default, `debputy` will pick up attributes that uses path names (FileSystemMatchRule) as candidates for parse error hints (the little "" in error messages). Most rules only have one active path-based attribute and paths tends to be unique enough that it helps people spot the issue faster. However, in rare cases, you can have multiple attributes that fit the bill. In this case, this hint can be used to "hide" the suboptimal choice. As an example: >>> class SourceType(TypedDict): ... source: List[FileSystemMatchRule] ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.not_path_error_hint()] In this case, without the hint, `debputy` might pick up `install_as` as the attribute to use as hint for error reporting. However, here we have decided that we never want `install_as` leaving `source` as the only option. Generally, this type hint must be placed on the **source** format. Any source attribute matching the parsed format will be ignored. Mind the assymmetry: The annotation is placed in the **source** format while `debputy` looks at the type of the target attribute to determine if it counts as path. """ return NOT_PATH_HINT @dataclasses.dataclass(frozen=True, slots=True) class TargetAttribute(DebputyParseHint): attribute: str @dataclasses.dataclass(frozen=True, slots=True) class ConflictWithSourceAttribute(DebputyParseHint): conflicting_attributes: FrozenSet[str] @dataclasses.dataclass(frozen=True, slots=True) class ConditionalRequired(DebputyParseHint): reason: str condition: Callable[["ParserContextData"], bool] def condition_applies(self, context: "ParserContextData") -> bool: return self.condition(context) @dataclasses.dataclass(frozen=True, slots=True) class ManifestAttribute(DebputyParseHint): attribute: str class NotPathHint(DebputyParseHint): pass NOT_PATH_HINT = NotPathHint() def _is_path_attribute_candidate( source_attribute: AttributeDescription, target_attribute: AttributeDescription ) -> bool: if ( source_attribute.parse_hints and not source_attribute.parse_hints.applicable_as_path_hint ): return False target_type = target_attribute.attribute_type _, origin, args = unpack_type(target_type, False) match_type = target_type if origin == list: match_type = args[0] return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) class ParserGenerator: def __init__(self) -> None: self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} def register_mapped_type(self, mapped_type: TypeMapping) -> None: existing = self._registered_types.get(mapped_type.target_type) if existing is not None: raise ValueError(f"The type {existing} is already registered") self._registered_types[mapped_type.target_type] = mapped_type def discard_mapped_type(self, mapped_type: Type[T]) -> None: del self._registered_types[mapped_type] def parser_from_typed_dict( self, parsed_content: Type[TD], *, source_content: Optional[SF] = None, allow_optional: bool = False, inline_reference_documentation: Optional[ParserDocumentation] = None, ) -> DeclarativeInputParser[TD]: """Derive a parser from a TypedDict Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict or two that are used as a description. In its most simple use-case, the caller provides a TypedDict of the expected attributed along with their types. As an example: >>> class InstallDocsRule(DebputyParsedContent): ... sources: List[str] ... into: List[str] >>> pg = ParserGenerator() >>> simple_parser = pg.parser_from_typed_dict(InstallDocsRule) This will create a parser that would be able to interpret something like: ```yaml install-docs: sources: ["docs/*"] into: ["my-pkg"] ``` While this is sufficient for programmers, it is a bit ridig for the packager writing the manifest. Therefore, you can also provide a TypedDict descriping the input, enabling more flexibility: >>> class InstallDocsRule(DebputyParsedContent): ... sources: List[str] ... into: List[str] >>> class InputDocsRuleInputFormat(TypedDict): ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] ... sources: NotRequired[List[str]] ... into: Union[str, List[str]] >>> pg = ParserGenerator() >>> flexible_parser = pg.parser_from_typed_dict( ... InstallDocsRule, ... source_content=InputDocsRuleInputFormat, ... ) In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` is used to ensure the input is not ambigious. For the `into` parameter, the parser will accept it being a str or a list of strings. Regardless of how the input was provided, the parser will normalize the input such that both `sources` and `into` in the result is a list of strings. As an example, this parser can accept both the previous input but also the following input: ```yaml install-docs: source: "docs/*" into: "my-pkg" ``` The `source` and `into` attributes are then normalized to lists as if the user had written them as lists with a single string in them. As noted above, the name of the `source` attribute will also be normalized while parsing. In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict as part of the input. Example: >>> class DiscardRule(DebputyParsedContent): ... paths: List[str] >>> class DiscardRuleInputDictFormat(TypedDict): ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] ... paths: NotRequired[List[str]] >>> # This format relies on DiscardRule having exactly one Required attribute >>> DiscardRuleInputWithAltFormat = Union[ ... DiscardRuleInputDictFormat, ... str, ... List[str], ... ] >>> pg = ParserGenerator() >>> flexible_parser = pg.parser_from_typed_dict( ... DiscardRule, ... source_content=DiscardRuleInputWithAltFormat, ... ) Supported types: * `List` - must have a fixed type argument (such as `List[str]`) * `str` * `int` * `BinaryPackage` - When provided (or required), the user must provide a package name listed in the debian/control file. The code receives the BinaryPackage instance matching that input. * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any format that `debputy' provides (such as `0644` or `a=rw,go=rw`). * `FileSystemOwner` - When provided (or required), the user must a file system owner that is available statically on all Debian systems (must be in `base-passwd`). The user has multiple options for how to specify it (either via name or id). * `FileSystemGroup` - When provided (or required), the user must a file system group that is available statically on all Debian systems (must be in `base-passwd`). The user has multiple options for how to specify it (either via name or id). * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. Usually, it is better to extend `DebputyParsedContentStandardConditional`, which provides the `debputy' default `when` parameter for conditionals. Supported special type-like parameters: * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the outermost level. Cannot vary between `parsed_content` and `source_content`. * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). Automapping (see below) is restricted to two members in the Union. Notable non-supported types: * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. Automatic mapping rules from `source_content` to `parsed_content`: - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: `lambda value: value if isinstance(value, list) else [value]` - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod for concrete features that may be useful to you. :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers) :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, the parsed_content is also considered the source_content (which affects what annotations are allowed in it). Note you should never pass the parsed_content as source_content directly. :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and can keep this False). :param inline_reference_documentation: Optionally, programmatic documentation :return: An input parser capable of reading input matching the TypedDict(s) used as reference. """ if not is_typeddict(parsed_content): raise ValueError( f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." ' Only "TypedDict"-based types supported.' ) if source_content is parsed_content: raise ValueError( "Do not provide source_content if it is the same as parsed_content" ) target_attributes = self._parse_types( parsed_content, allow_source_attribute_annotations=source_content is None, forbid_optional=not allow_optional, ) required_target_parameters = frozenset(parsed_content.__required_keys__) parsed_alt_form = None non_mapping_source_only = False if source_content is not None: default_target_attribute = None if len(required_target_parameters) == 1: default_target_attribute = next(iter(required_target_parameters)) source_typed_dict, alt_source_forms = _extract_typed_dict( source_content, default_target_attribute, ) if alt_source_forms: parsed_alt_form = self._parse_alt_form( alt_source_forms, default_target_attribute, ) if source_typed_dict is not None: source_content_attributes = self._parse_types( source_typed_dict, allow_target_attribute_annotation=True, allow_source_attribute_annotations=True, forbid_optional=not allow_optional, ) source_content_parameter = "source_content" source_and_parsed_differs = True else: source_typed_dict = parsed_content source_content_attributes = target_attributes source_content_parameter = "parsed_content" source_and_parsed_differs = True non_mapping_source_only = True else: source_typed_dict = parsed_content source_content_attributes = target_attributes source_content_parameter = "parsed_content" source_and_parsed_differs = False sources = collections.defaultdict(set) seen_targets = set() seen_source_names: Dict[str, str] = {} source_attributes: Dict[str, AttributeDescription] = {} path_hint_source_attributes = [] for k in source_content_attributes: ia = source_content_attributes[k] ta = ( target_attributes.get(ia.target_attribute) if source_and_parsed_differs else ia ) if ta is None: # Error message would be wrong if this assertion is false. assert source_and_parsed_differs raise ValueError( f'The attribute "{k}" from the "source_content" parameter should have mapped' f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' ) if _is_path_attribute_candidate(ia, ta): path_hint_source_attributes.append(ia.source_attribute_name) existing_source_name = seen_source_names.get(ia.source_attribute_name) if existing_source_name: raise ValueError( f'The attribute "{k}" and "{existing_source_name}" both share the source name' f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' f' so only one attribute use "{ia.source_attribute_name}".' ) seen_source_names[ia.source_attribute_name] = k seen_targets.add(ta.target_attribute) sources[ia.target_attribute].add(k) if source_and_parsed_differs: bridge_mapper = self._type_normalize( k, ia.attribute_type, ta.attribute_type, False ) ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) source_attributes[k] = ia def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]: return frozenset( source_content_attributes[a].source_attribute_name for a in td_name ) _check_attributes( parsed_content, source_typed_dict, source_content_attributes, sources, ) at_least_one_of = frozenset( _as_attr_names(g) for k, g in sources.items() if len(g) > 1 and k in required_target_parameters ) if source_and_parsed_differs and seen_targets != target_attributes.keys(): missing = ", ".join( repr(k) for k in (target_attributes.keys() - seen_targets) ) raise ValueError( 'The following attributes in "parsed_content" did not have a source field in "source_content":' f" {missing}" ) all_mutually_exclusive_fields = frozenset( _as_attr_names(g) for g in sources.values() if len(g) > 1 ) all_parameters = ( source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ ) _check_conflicts( source_content_attributes, source_typed_dict.__required_keys__, all_parameters, ) manifest_attributes = { a.source_attribute_name: a for a in source_content_attributes.values() } if parsed_alt_form is not None: target_attribute = parsed_alt_form.target_attribute if ( target_attribute not in required_target_parameters and required_target_parameters or len(required_target_parameters) > 1 ): raise NotImplementedError( "When using alternative source formats (Union[TypedDict, ...]), then the" " target must have at most one require parameter" ) bridge_mapper = self._type_normalize( target_attribute, parsed_alt_form.attribute_type, target_attributes[target_attribute].attribute_type, False, ) parsed_alt_form.type_validator = ( parsed_alt_form.type_validator.combine_mapper(bridge_mapper) ) _verify_inline_reference_documentation( source_content_attributes, inline_reference_documentation, parsed_alt_form is not None, ) if non_mapping_source_only: return DeclarativeNonMappingInputParser( assume_not_none(parsed_alt_form), inline_reference_documentation=inline_reference_documentation, ) else: return DeclarativeMappingInputParser( _as_attr_names(source_typed_dict.__required_keys__), _as_attr_names(all_parameters), manifest_attributes, source_attributes, mutually_exclusive_attributes=all_mutually_exclusive_fields, alt_form_parser=parsed_alt_form, at_least_one_of=at_least_one_of, inline_reference_documentation=inline_reference_documentation, path_hint_source_attributes=tuple(path_hint_source_attributes), ) def _as_type_validator( self, attribute: str, provided_type: Any, parsing_typed_dict_attribute: bool, ) -> AttributeTypeHandler: assert not isinstance(provided_type, tuple) if isinstance(provided_type, type) and issubclass( provided_type, DebputyDispatchableType ): return _dispatch_parser(provided_type) unmapped_type = self._strip_mapped_types( provided_type, parsing_typed_dict_attribute, ) type_normalizer = self._type_normalize( attribute, unmapped_type, provided_type, parsing_typed_dict_attribute, ) t_unmapped, t_orig, t_args = unpack_type( unmapped_type, parsing_typed_dict_attribute, ) if ( t_orig == Union and t_args and len(t_args) == 2 and any(v is _NONE_TYPE for v in t_args) ): _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) actual_type = [a for a in args if a is not _NONE_TYPE][0] validator = self._as_type_validator( attribute, actual_type, parsing_typed_dict_attribute ) def _validator(v: Any, path: AttributePath) -> None: if v is None: return validator.ensure_type(v, path) return AttributeTypeHandler( validator.describe_type(), _validator, base_type=validator.base_type, mapper=type_normalizer, ) if unmapped_type in BASIC_SIMPLE_TYPES: type_name = BASIC_SIMPLE_TYPES[unmapped_type] type_mapping = self._registered_types.get(provided_type) if type_mapping is not None: simple_type = f" ({type_name})" type_name = type_mapping.target_type.__name__ else: simple_type = "" def _validator(v: Any, path: AttributePath) -> None: if not isinstance(v, unmapped_type): _validation_type_error( path, f"The attribute must be a {type_name}{simple_type}" ) return AttributeTypeHandler( type_name, _validator, base_type=unmapped_type, mapper=type_normalizer, ) if t_orig == list: if not t_args: raise ValueError( f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' ) _, t_provided_orig, t_provided_args = unpack_type( provided_type, parsing_typed_dict_attribute, ) genetic_type = t_args[0] key_mapper = self._as_type_validator( attribute, genetic_type, parsing_typed_dict_attribute, ) def _validator(v: Any, path: AttributePath) -> None: if not isinstance(v, list): _validation_type_error(path, "The attribute must be a list") for i, v in enumerate(v): key_mapper.ensure_type(v, path[i]) list_mapper = ( map_each_element(key_mapper.mapper) if key_mapper.mapper is not None else None ) return AttributeTypeHandler( f"List of {key_mapper.describe_type()}", _validator, base_type=list, mapper=type_normalizer, ).combine_mapper(list_mapper) if is_typeddict(provided_type): subparser = self.parser_from_typed_dict(cast("Type[TD]", provided_type)) return AttributeTypeHandler( description=f"{provided_type.__name__} (Typed Mapping)", ensure_type=lambda v, ap: None, base_type=dict, mapper=lambda v, ap, cv: subparser.parse_input( v, ap, parser_context=cv ), ) if t_orig == dict: if not t_args or len(t_args) != 2: raise ValueError( f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' ) if t_args[0] != str: raise ValueError( f'The attribute "{attribute}" is Dict and has a non-str type as key.' " Currently, only `str` is supported (Dict[str, Y])" ) key_mapper = self._as_type_validator( attribute, t_args[0], parsing_typed_dict_attribute, ) value_mapper = self._as_type_validator( attribute, t_args[1], parsing_typed_dict_attribute, ) if key_mapper.base_type is None: raise ValueError( f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' f" without trivial base types (such as `str`) are not supported at the moment." ) if value_mapper.mapper is not None: raise ValueError( f'The attribute "{attribute}" is Dict and the value requires mapping.' " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." " Better typing may come later" ) def _validator(uv: Any, path: AttributePath) -> None: if not isinstance(uv, dict): _validation_type_error(path, "The attribute must be a mapping") key_name = "the first key in the mapping" for i, (k, v) in enumerate(uv.items()): if not key_mapper.base_type_match(k): kp = path.copy_with_path_hint(key_name) _validation_type_error( kp, f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', ) key_name = f"the key after {k}" value_mapper.ensure_type(v, path[k]) return AttributeTypeHandler( f"Mapping of {value_mapper.describe_type()}", _validator, base_type=dict, mapper=type_normalizer, ).combine_mapper(key_mapper.mapper) if t_orig == Union: if _is_two_arg_x_list_x(t_args): # Force the order to be "X, List[X]" as it simplifies the code x_list_x = ( t_args if get_origin(t_args[1]) == list else (t_args[1], t_args[0]) ) # X, List[X] could match if X was List[Y]. However, our code below assumes # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this # case to avoid this assert and fall into the "generic case". assert get_origin(x_list_x[0]) != list x_subtype_checker = self._as_type_validator( attribute, x_list_x[0], parsing_typed_dict_attribute, ) list_x_subtype_checker = self._as_type_validator( attribute, x_list_x[1], parsing_typed_dict_attribute, ) type_description = x_subtype_checker.describe_type() type_description = f"{type_description} or a list of {type_description}" def _validator(v: Any, path: AttributePath) -> None: if isinstance(v, list): list_x_subtype_checker.ensure_type(v, path) else: x_subtype_checker.ensure_type(v, path) return AttributeTypeHandler( type_description, _validator, mapper=type_normalizer, ) else: subtype_checker = [ self._as_type_validator(attribute, a, parsing_typed_dict_attribute) for a in t_args ] type_description = "one-of: " + ", ".join( f"{sc.describe_type()}" for sc in subtype_checker ) mapper = subtype_checker[0].mapper if any(mapper != sc.mapper for sc in subtype_checker): raise ValueError( f'Cannot handle the union "{provided_type}" as the target types need different' " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" " where X is a non-collection type." ) def _validator(v: Any, path: AttributePath) -> None: partial_matches = [] for sc in subtype_checker: try: sc.ensure_type(v, path) return except ManifestParseException as e: if sc.base_type_match(v): partial_matches.append((sc, e)) if len(partial_matches) == 1: raise partial_matches[0][1] _validation_type_error( path, f"Could not match against: {type_description}" ) return AttributeTypeHandler( type_description, _validator, mapper=type_normalizer, ) if t_orig == Literal: # We want "x" for string values; repr provides 'x' pretty = ", ".join( f'"{v}"' if isinstance(v, str) else str(v) for v in t_args ) def _validator(v: Any, path: AttributePath) -> None: if v not in t_args: value_hint = "" if isinstance(v, str): value_hint = f"({v}) " _validation_type_error( path, f"Value {value_hint}must be one of the following literal values: {pretty}", ) return AttributeTypeHandler( f"One of the following literal values: {pretty}", _validator, ) if provided_type == Any: return AttributeTypeHandler( "any (unvalidated)", lambda *a: None, ) raise ValueError( f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' ) def _parse_types( self, spec: Type[TypedDict], allow_target_attribute_annotation: bool = False, allow_source_attribute_annotations: bool = False, forbid_optional: bool = True, ) -> Dict[str, AttributeDescription]: annotations = get_type_hints(spec, include_extras=True) return { k: self._attribute_description( k, t, k in spec.__required_keys__, allow_target_attribute_annotation=allow_target_attribute_annotation, allow_source_attribute_annotations=allow_source_attribute_annotations, forbid_optional=forbid_optional, ) for k, t in annotations.items() } def _attribute_description( self, attribute: str, orig_td: Any, is_required: bool, forbid_optional: bool = True, allow_target_attribute_annotation: bool = False, allow_source_attribute_annotations: bool = False, ) -> AttributeDescription: td, anno, is_optional = _parse_type( attribute, orig_td, forbid_optional=forbid_optional ) type_validator = self._as_type_validator(attribute, td, True) parsed_annotations = DetectedDebputyParseHint.parse_annotations( anno, f' Seen with attribute "{attribute}".', attribute, is_required, allow_target_attribute_annotation=allow_target_attribute_annotation, allow_source_attribute_annotations=allow_source_attribute_annotations, ) return AttributeDescription( target_attribute=parsed_annotations.target_attribute, attribute_type=td, type_validator=type_validator, annotations=anno, is_optional=is_optional, conflicting_attributes=parsed_annotations.conflict_with_source_attributes, conditional_required=parsed_annotations.conditional_required, source_attribute_name=assume_not_none( parsed_annotations.source_manifest_attribute ), parse_hints=parsed_annotations, ) def _parse_alt_form( self, alt_form, default_target_attribute: Optional[str], ) -> AttributeDescription: td, anno, is_optional = _parse_type( "source_format alternative form", alt_form, forbid_optional=True, parsing_typed_dict_attribute=False, ) type_validator = self._as_type_validator( "source_format alternative form", td, True, ) parsed_annotations = DetectedDebputyParseHint.parse_annotations( anno, f" The alternative for source_format.", None, False, default_target_attribute=default_target_attribute, allow_target_attribute_annotation=True, allow_source_attribute_annotations=False, ) return AttributeDescription( target_attribute=parsed_annotations.target_attribute, attribute_type=td, type_validator=type_validator, annotations=anno, is_optional=is_optional, conflicting_attributes=parsed_annotations.conflict_with_source_attributes, conditional_required=parsed_annotations.conditional_required, source_attribute_name="Alt form of the source_format", ) def _union_narrowing( self, input_type: Any, target_type: Any, parsing_typed_dict_attribute: bool, ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: _, input_orig, input_args = unpack_type( input_type, parsing_typed_dict_attribute ) _, target_orig, target_args = unpack_type( target_type, parsing_typed_dict_attribute ) if input_orig != Union or not input_args: raise ValueError("input_type must be a Union[...] with non-empty args") # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] # - Where X = Y or there is a simple standard transformation from X to Y. if target_orig not in (Union, list) or not target_args: # Not supported return None if target_orig == Union and set(input_args) == set(target_args): # Not needed (identity mapping) return None if target_orig == list and not any(get_origin(a) == list for a in input_args): # Not supported return None target_arg = target_args[0] simplified_type = self._strip_mapped_types( target_arg, parsing_typed_dict_attribute ) acceptable_types = { target_arg, List[target_arg], # type: ignore simplified_type, List[simplified_type], # type: ignore } target_format = ( target_arg, List[target_arg], # type: ignore ) in_target_format = 0 in_simple_format = 0 for input_arg in input_args: if input_arg not in acceptable_types: # Not supported return None if input_arg in target_format: in_target_format += 1 else: in_simple_format += 1 assert in_simple_format or in_target_format if in_target_format and not in_simple_format: # Union[X, List[X]] -> List[X] return normalize_into_list mapped = self._registered_types[target_arg] if not in_target_format and in_simple_format: # Union[X, List[X]] -> List[Y] def _mapper_x_list_y( x: Union[Any, List[Any]], ap: AttributePath, pc: Optional["ParserContextData"], ) -> List[Any]: in_list_form: List[Any] = normalize_into_list(x, ap, pc) return [mapped.mapper(x, ap, pc) for x in in_list_form] return _mapper_x_list_y # Union[Y, List[X]] -> List[Y] if not isinstance(target_arg, type): raise ValueError( f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" f" not support mixed types. Please use either {simplified_type} or {target_arg}" f" in the source content (but both a mix of both)" ) def _mapper_mixed_list_y( x: Union[Any, List[Any]], ap: AttributePath, pc: Optional["ParserContextData"], ) -> List[Any]: in_list_form: List[Any] = normalize_into_list(x, ap, pc) return [ x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) for x in in_list_form ] return _mapper_mixed_list_y def _type_normalize( self, attribute: str, input_type: Any, target_type: Any, parsing_typed_dict_attribute: bool, ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: if input_type == target_type: return None _, input_orig, input_args = unpack_type( input_type, parsing_typed_dict_attribute ) _, target_orig, target_args = unpack_type( target_type, parsing_typed_dict_attribute, ) if input_orig == Union: result = self._union_narrowing( input_type, target_type, parsing_typed_dict_attribute ) if result: return result elif target_orig == list and target_args[0] == input_type: return wrap_into_list mapped = self._registered_types.get(target_type) if mapped is not None and input_type == mapped.source_type: # Source -> Target return mapped.mapper if target_orig == list and target_args: mapped = self._registered_types.get(target_args[0]) if mapped is not None: # mypy is dense and forgots `mapped` cannot be optional in the comprehensions. mapped_type: TypeMapping = mapped if input_type == mapped.source_type: # Source -> List[Target] return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] if ( input_orig == list and input_args and input_args[0] == mapped_type.source_type ): # List[Source] -> List[Target] return lambda xs, ap, pc: [ mapped_type.mapper(x, ap, pc) for x in xs ] raise ValueError( f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' f" {input_type} to {target_type}" ) def _strip_mapped_types( self, orig_td: Any, parsing_typed_dict_attribute: bool ) -> Any: m = self._registered_types.get(orig_td) if m is not None: return m.source_type _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) if v == list: arg = args[0] m = self._registered_types.get(arg) if m: return List[m.source_type] # type: ignore if v == Union: stripped_args = tuple( self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args ) if stripped_args != args: return Union[stripped_args] return orig_td def _verify_inline_reference_documentation( source_content_attributes: Mapping[str, AttributeDescription], inline_reference_documentation: Optional[ParserDocumentation], has_alt_form: bool, ) -> None: if inline_reference_documentation is None: return attribute_doc = inline_reference_documentation.attribute_doc if attribute_doc: seen = set() for attr_doc in attribute_doc: for attr_name in attr_doc.attributes: attr = source_content_attributes.get(attr_name) if attr is None: raise ValueError( f'The inline_reference_documentation references an attribute "{attr_name}", which does not' f" exist in the source format." ) if attr_name in seen: raise ValueError( f'The inline_reference_documentation has documentation for "{attr_name}" twice,' f" which is not supported. Please document it at most once" ) seen.add(attr_name) undocumented = source_content_attributes.keys() - seen if undocumented: undocumented_attrs = ", ".join(undocumented) raise ValueError( "The following attributes were not documented. If this is deliberate, then please" ' declare each them as undocumented (via undocumented_attr("foo")):' f" {undocumented_attrs}" ) if inline_reference_documentation.alt_parser_description and not has_alt_form: raise ValueError( "The inline_reference_documentation had documentation for an non-mapping format," " but the source format does not have a non-mapping format." ) def _check_conflicts( input_content_attributes: Dict[str, AttributeDescription], required_attributes: FrozenSet[str], all_attributes: FrozenSet[str], ) -> None: for attr_name, attr in input_content_attributes.items(): if attr_name in required_attributes and attr.conflicting_attributes: c = ", ".join(repr(a) for a in attr.conflicting_attributes) raise ValueError( f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' " This makes it impossible to use these attributes. Either remove the attributes" f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' " optional (NotRequired)" ) else: required_conflicts = attr.conflicting_attributes & required_attributes if required_conflicts: c = ", ".join(repr(a) for a in required_conflicts) raise ValueError( f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' f" adjust the conflicts or make the listed attributes optional (NotRequired)" ) unknown_attributes = attr.conflicting_attributes - all_attributes if unknown_attributes: c = ", ".join(repr(a) for a in unknown_attributes) raise ValueError( f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' f" None of these attributes were declared in the input." ) def _check_attributes( content: Type[TypedDict], input_content: Type[TypedDict], input_content_attributes: Dict[str, AttributeDescription], sources: Mapping[str, Collection[str]], ) -> None: target_required_keys = content.__required_keys__ input_required_keys = input_content.__required_keys__ all_input_keys = input_required_keys | input_content.__optional_keys__ for input_name in all_input_keys: attr = input_content_attributes[input_name] target_name = attr.target_attribute source_names = sources[target_name] input_is_required = input_name in input_required_keys target_is_required = target_name in target_required_keys assert source_names if input_is_required and len(source_names) > 1: raise ValueError( f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' f' which has multiple sources "{source_names}". If "{input_name}" should be required,' f' then there is no need for additional sources for "{target_name}". Alternatively,' f' "{input_name}" might be missing a NotRequired type' f' (example: "{input_name}: NotRequired[]")' ) if not input_is_required and target_is_required and len(source_names) == 1: raise ValueError( f'The source attribute "{input_name}" is not marked as required and maps to' f' "{target_name}", which is marked as required. As there are no other attributes' f' mapping to "{target_name}", then "{input_name}" must be required as well' f' ("{input_name}: Required[]"). Alternatively, "{target_name}" should be optional' f' ("{target_name}: NotRequired[]") or an "MappingHint.aliasOf" might be missing.' ) def _validation_type_error(path: AttributePath, message: str) -> None: raise ManifestParseException( f'The attribute "{path.path}" did not have a valid structure/type: {message}' ) def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool: if len(t_args) != 2: return False lhs, rhs = t_args if get_origin(lhs) == list: if get_origin(rhs) == list: # It could still match X, List[X] - but we do not allow this case for now as the caller # does not support it. return False l_args = get_args(lhs) return bool(l_args and l_args[0] == rhs) if get_origin(rhs) == list: r_args = get_args(rhs) return bool(r_args and r_args[0] == lhs) return False def _extract_typed_dict( base_type, default_target_attribute: Optional[str], ) -> Tuple[Optional[Type[TypedDict]], Any]: if is_typeddict(base_type): return base_type, None _, origin, args = unpack_type(base_type, False) if origin != Union: if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): raise ValueError( "The source_format cannot be nor contain a (non-TypedDict) dict" ) return None, base_type typed_dicts = [x for x in args if is_typeddict(x)] if len(typed_dicts) > 1: raise ValueError( "When source_format is a Union, it must contain at most one TypedDict" ) typed_dict = typed_dicts[0] if typed_dicts else None if any(x is None or x is _NONE_TYPE for x in args): raise ValueError( "The source_format cannot be nor contain Optional[X] or Union[X, None]" ) if any( isinstance(x, type) and issubclass(x, (dict, Mapping)) for x in args if x is not typed_dict ): raise ValueError( "The source_format cannot be nor contain a (non-TypedDict) dict" ) remaining = [x for x in args if x is not typed_dict] has_target_attribute = False anno = None if len(remaining) == 1: base_type, anno, _ = _parse_type( "source_format alternative form", remaining[0], forbid_optional=True, parsing_typed_dict_attribute=False, ) has_target_attribute = bool(anno) and any( isinstance(x, TargetAttribute) for x in anno ) target_type = base_type else: target_type = Union[tuple(remaining)] if default_target_attribute is None and not has_target_attribute: raise ValueError( 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' " OR the parsed_content format must have exactly one attribute that is required." ) if anno: final_anno = [target_type] final_anno.extend(anno) return typed_dict, Annotated[tuple(final_anno)] return typed_dict, target_type def _dispatch_parse_generator( dispatch_type: Type[DebputyDispatchableType], ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: def _dispatch_parse( value: Any, attribute_path: AttributePath, parser_context: Optional["ParserContextData"], ): assert parser_context is not None dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) return dispatching_parser.parse( value, attribute_path, parser_context=parser_context ) return _dispatch_parse def _dispatch_parser( dispatch_type: Type[DebputyDispatchableType], ) -> AttributeTypeHandler: return AttributeTypeHandler( dispatch_type.__name__, lambda *a: None, mapper=_dispatch_parse_generator(dispatch_type), ) def _parse_type( attribute: str, orig_td: Any, forbid_optional: bool = True, parsing_typed_dict_attribute: bool = True, ) -> Tuple[Any, Tuple[Any, ...], bool]: td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) md: Tuple[Any, ...] = tuple() optional = False if v is not None: if v == Annotated: anno = get_args(td) md = anno[1:] td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) if td is _NONE_TYPE: raise ValueError( f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' " debputy manifest, so this attribute does not make sense in its current form." ) if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): raise ValueError( f'Detected use of Optional in "{attribute}", which is not allowed here.' " Please use NotRequired for optional fields" ) return td, md, optional def _normalize_attribute_name(attribute: str) -> str: if attribute.endswith("_"): attribute = attribute[:-1] return attribute.replace("_", "-") @dataclasses.dataclass class DetectedDebputyParseHint: target_attribute: str source_manifest_attribute: Optional[str] conflict_with_source_attributes: FrozenSet[str] conditional_required: Optional[ConditionalRequired] applicable_as_path_hint: bool @classmethod def parse_annotations( cls, anno: Tuple[Any, ...], error_context: str, default_attribute_name: Optional[str], is_required: bool, default_target_attribute: Optional[str] = None, allow_target_attribute_annotation: bool = False, allow_source_attribute_annotations: bool = False, ) -> "DetectedDebputyParseHint": target_attr_anno = find_annotation(anno, TargetAttribute) if target_attr_anno: if not allow_target_attribute_annotation: raise ValueError( f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" ) target_attribute = target_attr_anno.attribute elif default_target_attribute is not None: target_attribute = default_target_attribute elif default_attribute_name is not None: target_attribute = default_attribute_name else: if default_attribute_name is None: raise ValueError( "allow_target_attribute_annotation must be True OR " "default_attribute_name/default_target_attribute must be not None" ) raise ValueError( f"Missing DebputyParseHint.target_attribute annotation.{error_context}" ) source_attribute_anno = find_annotation(anno, ManifestAttribute) _source_attribute_allowed( allow_source_attribute_annotations, error_context, source_attribute_anno ) if source_attribute_anno: source_attribute_name = source_attribute_anno.attribute elif default_attribute_name is not None: source_attribute_name = _normalize_attribute_name(default_attribute_name) else: source_attribute_name = None mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) if mutual_exclusive_with_anno: _source_attribute_allowed( allow_source_attribute_annotations, error_context, mutual_exclusive_with_anno, ) conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes else: conflicting_attributes = frozenset() conditional_required = find_annotation(anno, ConditionalRequired) if conditional_required and is_required: if default_attribute_name is None: raise ValueError( f"is_required cannot be True without default_attribute_name being not None" ) raise ValueError( f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' ' Please make the attribute "NotRequired" or remove the conditional requirement.' ) not_path_hint_anno = find_annotation(anno, NotPathHint) applicable_as_path_hint = not_path_hint_anno is None return DetectedDebputyParseHint( target_attribute=target_attribute, source_manifest_attribute=source_attribute_name, conflict_with_source_attributes=conflicting_attributes, conditional_required=conditional_required, applicable_as_path_hint=applicable_as_path_hint, ) def _source_attribute_allowed( source_attribute_allowed: bool, error_context: str, annotation: Optional[DebputyParseHint], ) -> None: if source_attribute_allowed or annotation is None: return raise ValueError( f'The annotation "{annotation}" cannot be used here. {error_context}' )