diff options
Diffstat (limited to 'src/debputy/manifest_parser/declarative_parser.py')
-rw-r--r-- | src/debputy/manifest_parser/declarative_parser.py | 1893 |
1 files changed, 1893 insertions, 0 deletions
diff --git a/src/debputy/manifest_parser/declarative_parser.py b/src/debputy/manifest_parser/declarative_parser.py new file mode 100644 index 0000000..32e93fe --- /dev/null +++ b/src/debputy/manifest_parser/declarative_parser.py @@ -0,0 +1,1893 @@ +import collections +import dataclasses +import itertools +from typing import ( + Any, + Callable, + Tuple, + TypedDict, + Dict, + get_type_hints, + Annotated, + get_args, + get_origin, + TypeVar, + Generic, + FrozenSet, + Mapping, + Optional, + cast, + is_typeddict, + Type, + Union, + List, + Collection, + NotRequired, + Iterable, + Literal, + Sequence, +) + +from debputy.manifest_parser.base_types import ( + DebputyParsedContent, + StaticFileSystemOwner, + StaticFileSystemGroup, + FileSystemMode, + OctalMode, + SymlinkTarget, + FileSystemMatchRule, + FileSystemExactMatchRule, + FileSystemExactNonDirMatchRule, + DebputyDispatchableType, + TypeMapping, +) +from debputy.manifest_parser.exceptions import ( + ManifestParseException, +) +from debputy.manifest_parser.mapper_code import ( + type_mapper_str2package, + normalize_into_list, + wrap_into_list, + map_each_element, +) +from debputy.manifest_parser.parser_data import ParserContextData +from debputy.manifest_parser.util import AttributePath, unpack_type, find_annotation +from debputy.packages import BinaryPackage +from debputy.plugin.api.impl_types import ( + DeclarativeInputParser, + TD, + _ALL_PACKAGE_TYPES, + resolve_package_type_selectors, +) +from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector +from debputy.util import _info, _warn, assume_not_none + +try: + from Levenshtein import distance +except ImportError: + _WARN_ONCE = False + + def _detect_possible_typo( + _key: str, + _value: object, + _manifest_attributes: Mapping[str, "AttributeDescription"], + _path: "AttributePath", + ) -> None: + global _WARN_ONCE + if not _WARN_ONCE: + _WARN_ONCE = True + _info( + "Install python3-levenshtein to have debputy try to detect typos in the manifest." + ) + +else: + + def _detect_possible_typo( + key: str, + value: object, + manifest_attributes: Mapping[str, "AttributeDescription"], + path: "AttributePath", + ) -> None: + k_len = len(key) + key_path = path[key] + matches: List[str] = [] + current_match_strength = 0 + for acceptable_key, attr in manifest_attributes.items(): + if abs(k_len - len(acceptable_key)) > 2: + continue + d = distance(key, acceptable_key) + if d > 2: + continue + try: + attr.type_validator.ensure_type(value, key_path) + except ManifestParseException: + if attr.type_validator.base_type_match(value): + match_strength = 1 + else: + match_strength = 0 + else: + match_strength = 2 + + if match_strength < current_match_strength: + continue + if match_strength > current_match_strength: + current_match_strength = match_strength + matches.clear() + matches.append(acceptable_key) + + if not matches: + return + ref = f'at "{path.path}"' if path else "at the manifest root level" + if len(matches) == 1: + possible_match = repr(matches[0]) + _warn( + f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' + ) + else: + matches.sort() + possible_matches = ", ".join(repr(a) for a in matches) + _warn( + f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' + ) + + +SF = TypeVar("SF") +T = TypeVar("T") +S = TypeVar("S") + + +_NONE_TYPE = type(None) + + +# These must be able to appear in an "isinstance" check and must be builtin types. +BASIC_SIMPLE_TYPES = { + str: "string", + int: "integer", + bool: "boolean", +} + + +class AttributeTypeHandler: + __slots__ = ("_description", "_ensure_type", "base_type", "mapper") + + def __init__( + self, + description: str, + ensure_type: Callable[[Any, AttributePath], None], + *, + base_type: Optional[Type[Any]] = None, + mapper: Optional[ + Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] + ] = None, + ) -> None: + self._description = description + self._ensure_type = ensure_type + self.base_type = base_type + self.mapper = mapper + + def describe_type(self) -> str: + return self._description + + def ensure_type(self, obj: object, path: AttributePath) -> None: + self._ensure_type(obj, path) + + def base_type_match(self, obj: object) -> bool: + base_type = self.base_type + return base_type is not None and isinstance(obj, base_type) + + def map_type( + self, + value: Any, + path: AttributePath, + parser_context: Optional["ParserContextData"], + ) -> Any: + mapper = self.mapper + if mapper is not None: + return mapper(value, path, parser_context) + return value + + def combine_mapper( + self, + mapper: Optional[ + Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] + ], + ) -> "AttributeTypeHandler": + if mapper is None: + return self + if self.mapper is not None: + m = self.mapper + + def _combined_mapper( + value: Any, + path: AttributePath, + parser_context: Optional["ParserContextData"], + ) -> Any: + return mapper(m(value, path, parser_context), path, parser_context) + + else: + _combined_mapper = mapper + + return AttributeTypeHandler( + self._description, + self._ensure_type, + base_type=self.base_type, + mapper=_combined_mapper, + ) + + +@dataclasses.dataclass(slots=True) +class AttributeDescription: + source_attribute_name: str + target_attribute: str + attribute_type: Any + type_validator: AttributeTypeHandler + annotations: Tuple[Any, ...] + conflicting_attributes: FrozenSet[str] + conditional_required: Optional["ConditionalRequired"] + parse_hints: Optional["DetectedDebputyParseHint"] = None + is_optional: bool = False + + +def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: + if attribute_path.path_hint is not None: + return True + if isinstance(v, str): + attribute_path.path_hint = v + return True + elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): + attribute_path.path_hint = v[0] + return True + return False + + +@dataclasses.dataclass(slots=True, frozen=True) +class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): + alt_form_parser: AttributeDescription + inline_reference_documentation: Optional[ParserDocumentation] = None + + def parse_input( + self, + value: object, + path: AttributePath, + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + if self.reference_documentation_url is not None: + doc_ref = f" (Documentation: {self.reference_documentation_url})" + else: + doc_ref = "" + + alt_form_parser = self.alt_form_parser + if value is None: + form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" + if self.reference_documentation_url is not None: + doc_ref = f" Please see {self.reference_documentation_url} for the documentation." + raise ManifestParseException( + f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" + ) + _extract_path_hint(value, path) + alt_form_parser.type_validator.ensure_type(value, path) + attribute = alt_form_parser.target_attribute + alias_mapping = { + attribute: ("", None), + } + v = alt_form_parser.type_validator.map_type(value, path, parser_context) + path.alias_mapping = alias_mapping + return cast("TD", {attribute: v}) + + +@dataclasses.dataclass(slots=True) +class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): + input_time_required_parameters: FrozenSet[str] + all_parameters: FrozenSet[str] + manifest_attributes: Mapping[str, "AttributeDescription"] + source_attributes: Mapping[str, "AttributeDescription"] + at_least_one_of: FrozenSet[FrozenSet[str]] + alt_form_parser: Optional[AttributeDescription] + mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset() + _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None + inline_reference_documentation: Optional[ParserDocumentation] = None + path_hint_source_attributes: Sequence[str] = tuple() + + def parse_input( + self, + value: object, + path: AttributePath, + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + if self.reference_documentation_url is not None: + doc_ref = f" (Documentation: {self.reference_documentation_url})" + else: + doc_ref = "" + if value is None: + form_note = " The attribute must be a mapping." + if self.alt_form_parser is not None: + form_note = ( + " The attribute can be a mapping or a non-mapping format" + ' (usually, "non-mapping format" means a string or a list of strings).' + ) + if self.reference_documentation_url is not None: + doc_ref = f" Please see {self.reference_documentation_url} for the documentation." + raise ManifestParseException( + f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" + ) + if not isinstance(value, dict): + alt_form_parser = self.alt_form_parser + if alt_form_parser is None: + raise ManifestParseException( + f"The attribute {path.path} must be a mapping.{doc_ref}" + ) + _extract_path_hint(value, path) + alt_form_parser.type_validator.ensure_type(value, path) + assert ( + value is not None + ), "The alternative form was None, but the parser should have rejected None earlier." + attribute = alt_form_parser.target_attribute + alias_mapping = { + attribute: ("", None), + } + v = alt_form_parser.type_validator.map_type(value, path, parser_context) + path.alias_mapping = alias_mapping + return cast("TD", {attribute: v}) + + unknown_keys = value.keys() - self.all_parameters + if unknown_keys: + for k in unknown_keys: + if isinstance(k, str): + _detect_possible_typo(k, value[k], self.manifest_attributes, path) + unused_keys = self.all_parameters - value.keys() + if unused_keys: + k = ", ".join(unused_keys) + raise ManifestParseException( + f'Unknown keys "{unknown_keys}" at {path.path}". Keys that could be used here are: {k}.{doc_ref}' + ) + raise ManifestParseException( + f'Unknown keys "{unknown_keys}" at {path.path}". Please remove them.{doc_ref}' + ) + missing_keys = self.input_time_required_parameters - value.keys() + if missing_keys: + required = ", ".join(repr(k) for k in sorted(missing_keys)) + raise ManifestParseException( + f"The following keys were required but not present at {path.path}: {required}{doc_ref}" + ) + for maybe_required in self.all_parameters - value.keys(): + attr = self.manifest_attributes[maybe_required] + assert attr.conditional_required is None or parser_context is not None + if ( + attr.conditional_required is not None + and attr.conditional_required.condition_applies( + assume_not_none(parser_context) + ) + ): + reason = attr.conditional_required.reason + raise ManifestParseException( + f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path}. {reason}{doc_ref}' + ) + for keyset in self.at_least_one_of: + matched_keys = value.keys() & keyset + if not matched_keys: + conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) + raise ManifestParseException( + f"At least one of the following keys must be present at {path.path}:" + f" {conditionally_required}{doc_ref}" + ) + for group in self.mutually_exclusive_attributes: + matched = value.keys() & group + if len(matched) > 1: + ck = ", ".join(repr(k) for k in sorted(matched)) + raise ManifestParseException( + f"Could not parse {path.path}: The following attributes are" + f" mutually exclusive: {ck}{doc_ref}" + ) + result = {} + per_attribute_conflicts = self._per_attribute_conflicts() + alias_mapping = {} + for path_hint_source_attributes in self.path_hint_source_attributes: + v = value.get(path_hint_source_attributes) + if v is not None and _extract_path_hint(v, path): + break + for k, v in value.items(): + attr = self.manifest_attributes[k] + matched = value.keys() & per_attribute_conflicts[k] + if matched: + ck = ", ".join(repr(k) for k in sorted(matched)) + raise ManifestParseException( + f'The attribute "{k}" at {path.path} cannot be used with the following' + f" attributes: {ck}{doc_ref}" + ) + nk = attr.target_attribute + key_path = path[k] + attr.type_validator.ensure_type(v, key_path) + if v is None: + continue + if k != nk: + alias_mapping[nk] = k, None + v = attr.type_validator.map_type(v, key_path, parser_context) + result[nk] = v + if alias_mapping: + path.alias_mapping = alias_mapping + return cast("TD", result) + + def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: + conflicts = self._per_attribute_conflicts_cache + if conflicts is not None: + return conflicts + attrs = self.source_attributes + conflicts = { + a.source_attribute_name: frozenset( + attrs[ca].source_attribute_name for ca in a.conflicting_attributes + ) + for a in attrs.values() + } + self._per_attribute_conflicts_cache = conflicts + return self._per_attribute_conflicts_cache + + +class DebputyParseHint: + @classmethod + def target_attribute(cls, target_attribute: str) -> "DebputyParseHint": + """Define this source attribute to have a different target attribute name + + As an example: + + >>> class SourceType(TypedDict): + ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] + ... sources: NotRequired[List[str]] + >>> class TargetType(TypedDict): + ... sources: List[str] + >>> pg = ParserGenerator() + >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) + + In this example, the user can provide either `source` or `sources` and the parser will + map them to the `sources` attribute in the `TargetType`. Note this example relies on + the builtin mapping of `str` to `List[str]` to align the types between `source` (from + SourceType) and `sources` (from TargetType). + + The following rules apply: + + * All source attributes that map to the same target attribute will be mutually exclusive + (that is, the user cannot give `source` *and* `sources` as input). + * When the target attribute is required, the source attributes are conditionally + mandatory requiring the user to provide exactly one of them. + * When multiple source attributes point to a single target attribute, none of the source + attributes can be Required. + * The annotation can only be used for the source type specification and the source type + specification must be different from the target type specification. + + The `target_attribute` annotation can be used without having multiple source attributes. This + can be useful if the source attribute name is not valid as a python variable identifier to + rename it to a valid python identifier. + + :param target_attribute: The attribute name in the target content + :return: The annotation. + """ + return TargetAttribute(target_attribute) + + @classmethod + def conflicts_with_source_attributes( + cls, + *conflicting_source_attributes: str, + ) -> "DebputyParseHint": + """Declare a conflict with one or more source attributes + + Example: + + >>> class SourceType(TypedDict): + ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")] + ... sources: NotRequired[List[str]] + ... into_dir: NotRequired[str] + ... renamed_to: Annotated[ + ... NotRequired[str], + ... DebputyParseHint.conflicts_with_source_attributes("sources", "into_dir") + ... ] + >>> class TargetType(TypedDict): + ... sources: List[str] + ... into_dir: NotRequired[str] + ... renamed_to: NotRequired[str] + >>> pg = ParserGenerator() + >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) + + In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report + an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for + the input source. That is, it is irrelevant that `sources` and `sourceĀ“ happens to "map" to the same target + attribute. + + The following rules apply: + * It is not possible for a target attribute to declare conflicts unless the target type spec is reused as + source type spec. + * All attributes involved in a conflict must be NotRequired. If any of the attributes are Required, then + the parser generator will reject the input. + * All attributes listed in the conflict must be valid attributes in the source type spec. + + Note you do not have to specify conflicts between two attributes with the same target attribute name. The + `target_attribute` annotation will handle that for you. + + :param conflicting_source_attributes: All source attributes that cannot be used with this attribute. + :return: The annotation. + """ + if len(conflicting_source_attributes) < 1: + raise ValueError( + "DebputyParseHint.conflicts_with_source_attributes requires at least one attribute as input" + ) + return ConflictWithSourceAttribute(frozenset(conflicting_source_attributes)) + + @classmethod + def required_when_single_binary( + cls, + *, + package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, + ) -> "DebputyParseHint": + """Declare a source attribute as required when the source package produces exactly one binary package + + The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition + can only be used for source attributes. + """ + resolved_package_types = resolve_package_type_selectors(package_type) + reason = "The field is required for source packages producing exactly one binary package" + if resolved_package_types != _ALL_PACKAGE_TYPES: + types = ", ".join(sorted(resolved_package_types)) + reason += f" of type {types}" + return ConditionalRequired( + reason, + lambda c: len( + [ + p + for p in c.binary_packages.values() + if p.package_type in package_type + ] + ) + == 1, + ) + return ConditionalRequired( + reason, + lambda c: c.is_single_binary_package, + ) + + @classmethod + def required_when_multi_binary( + cls, + *, + package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES, + ) -> "DebputyParseHint": + """Declare a source attribute as required when the source package produces two or more binary package + + The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition + can only be used for source attributes. + """ + resolved_package_types = resolve_package_type_selectors(package_type) + reason = "The field is required for source packages producing two or more binary packages" + if resolved_package_types != _ALL_PACKAGE_TYPES: + types = ", ".join(sorted(resolved_package_types)) + reason = ( + "The field is required for source packages producing not producing exactly one binary packages" + f" of type {types}" + ) + return ConditionalRequired( + reason, + lambda c: len( + [ + p + for p in c.binary_packages.values() + if p.package_type in package_type + ] + ) + != 1, + ) + return ConditionalRequired( + reason, + lambda c: not c.is_single_binary_package, + ) + + @classmethod + def manifest_attribute(cls, attribute: str) -> "DebputyParseHint": + """Declare what the attribute name (as written in the manifest) should be + + By default, debputy will do an attribute normalizing that will take valid python identifiers such + as `dest_dir` and remap it to the manifest variant (such as `dest-dir`) automatically. If you have + a special case, where this built-in normalization is insufficient or the python name is considerably + different from what the user would write in the manifest, you can use this parse hint to set the + name that the user would have to write in the manifest for this attribute. + + >>> class SourceType(TypedDict): + ... source: List[FileSystemMatchRule] + ... # Use "as" in the manifest because "as_" was not pretty enough + ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.manifest_attribute("as")] + + In this example, we use the parse hint to use "as" as the name in the manifest, because we cannot + use "as" a valid python identifier (it is a keyword). While debputy would map `as_` to `as` for us, + we have chosen to use `install_as` as a python identifier. + """ + return ManifestAttribute(attribute) + + @classmethod + def not_path_error_hint(cls) -> "DebputyParseHint": + """Mark this attribute as not a "path hint" when it comes to reporting errors + + By default, `debputy` will pick up attributes that uses path names (FileSystemMatchRule) as + candidates for parse error hints (the little "<Search for: VALUE>" in error messages). + + Most rules only have one active path-based attribute and paths tends to be unique enough + that it helps people spot the issue faster. However, in rare cases, you can have multiple + attributes that fit the bill. In this case, this hint can be used to "hide" the suboptimal + choice. As an example: + + >>> class SourceType(TypedDict): + ... source: List[FileSystemMatchRule] + ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.not_path_error_hint()] + + In this case, without the hint, `debputy` might pick up `install_as` as the attribute to + use as hint for error reporting. However, here we have decided that we never want `install_as` + leaving `source` as the only option. + + Generally, this type hint must be placed on the **source** format. Any source attribute matching + the parsed format will be ignored. + + Mind the assymmetry: The annotation is placed in the **source** format while `debputy` looks at + the type of the target attribute to determine if it counts as path. + """ + return NOT_PATH_HINT + + +@dataclasses.dataclass(frozen=True, slots=True) +class TargetAttribute(DebputyParseHint): + attribute: str + + +@dataclasses.dataclass(frozen=True, slots=True) +class ConflictWithSourceAttribute(DebputyParseHint): + conflicting_attributes: FrozenSet[str] + + +@dataclasses.dataclass(frozen=True, slots=True) +class ConditionalRequired(DebputyParseHint): + reason: str + condition: Callable[["ParserContextData"], bool] + + def condition_applies(self, context: "ParserContextData") -> bool: + return self.condition(context) + + +@dataclasses.dataclass(frozen=True, slots=True) +class ManifestAttribute(DebputyParseHint): + attribute: str + + +class NotPathHint(DebputyParseHint): + pass + + +NOT_PATH_HINT = NotPathHint() + + +def _is_path_attribute_candidate( + source_attribute: AttributeDescription, target_attribute: AttributeDescription +) -> bool: + if ( + source_attribute.parse_hints + and not source_attribute.parse_hints.applicable_as_path_hint + ): + return False + target_type = target_attribute.attribute_type + _, origin, args = unpack_type(target_type, False) + match_type = target_type + if origin == list: + match_type = args[0] + return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) + + +class ParserGenerator: + def __init__(self) -> None: + self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} + + def register_mapped_type(self, mapped_type: TypeMapping) -> None: + existing = self._registered_types.get(mapped_type.target_type) + if existing is not None: + raise ValueError(f"The type {existing} is already registered") + self._registered_types[mapped_type.target_type] = mapped_type + + def discard_mapped_type(self, mapped_type: Type[T]) -> None: + del self._registered_types[mapped_type] + + def parser_from_typed_dict( + self, + parsed_content: Type[TD], + *, + source_content: Optional[SF] = None, + allow_optional: bool = False, + inline_reference_documentation: Optional[ParserDocumentation] = None, + ) -> DeclarativeInputParser[TD]: + """Derive a parser from a TypedDict + + Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict + or two that are used as a description. + + In its most simple use-case, the caller provides a TypedDict of the expected attributed along with + their types. As an example: + + >>> class InstallDocsRule(DebputyParsedContent): + ... sources: List[str] + ... into: List[str] + >>> pg = ParserGenerator() + >>> simple_parser = pg.parser_from_typed_dict(InstallDocsRule) + + This will create a parser that would be able to interpret something like: + + ```yaml + install-docs: + sources: ["docs/*"] + into: ["my-pkg"] + ``` + + While this is sufficient for programmers, it is a bit ridig for the packager writing the manifest. Therefore, + you can also provide a TypedDict descriping the input, enabling more flexibility: + + >>> class InstallDocsRule(DebputyParsedContent): + ... sources: List[str] + ... into: List[str] + >>> class InputDocsRuleInputFormat(TypedDict): + ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] + ... sources: NotRequired[List[str]] + ... into: Union[str, List[str]] + >>> pg = ParserGenerator() + >>> flexible_parser = pg.parser_from_typed_dict( + ... InstallDocsRule, + ... source_content=InputDocsRuleInputFormat, + ... ) + + In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) + or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` + is used to ensure the input is not ambigious. For the `into` parameter, the parser will accept it being a str + or a list of strings. Regardless of how the input was provided, the parser will normalize the input such that + both `sources` and `into` in the result is a list of strings. As an example, this parser can accept + both the previous input but also the following input: + + ```yaml + install-docs: + source: "docs/*" + into: "my-pkg" + ``` + + The `source` and `into` attributes are then normalized to lists as if the user had written them as lists + with a single string in them. As noted above, the name of the `source` attribute will also be normalized + while parsing. + + In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict + as part of the input. Example: + + >>> class DiscardRule(DebputyParsedContent): + ... paths: List[str] + >>> class DiscardRuleInputDictFormat(TypedDict): + ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] + ... paths: NotRequired[List[str]] + >>> # This format relies on DiscardRule having exactly one Required attribute + >>> DiscardRuleInputWithAltFormat = Union[ + ... DiscardRuleInputDictFormat, + ... str, + ... List[str], + ... ] + >>> pg = ParserGenerator() + >>> flexible_parser = pg.parser_from_typed_dict( + ... DiscardRule, + ... source_content=DiscardRuleInputWithAltFormat, + ... ) + + + Supported types: + * `List` - must have a fixed type argument (such as `List[str]`) + * `str` + * `int` + * `BinaryPackage` - When provided (or required), the user must provide a package name listed + in the debian/control file. The code receives the BinaryPackage instance + matching that input. + * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any + format that `debputy' provides (such as `0644` or `a=rw,go=rw`). + * `FileSystemOwner` - When provided (or required), the user must a file system owner that is + available statically on all Debian systems (must be in `base-passwd`). + The user has multiple options for how to specify it (either via name or id). + * `FileSystemGroup` - When provided (or required), the user must a file system group that is + available statically on all Debian systems (must be in `base-passwd`). + The user has multiple options for how to specify it (either via name or id). + * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. + Usually, it is better to extend `DebputyParsedContentStandardConditional`, which + provides the `debputy' default `when` parameter for conditionals. + + Supported special type-like parameters: + + * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the + outermost level. Cannot vary between `parsed_content` and `source_content`. + * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. + * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). + Automapping (see below) is restricted to two members in the Union. + + Notable non-supported types: + * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. + * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. + + Automatic mapping rules from `source_content` to `parsed_content`: + - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: + `lambda value: value if isinstance(value, list) else [value]` + - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` + + Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod + for concrete features that may be useful to you. + + :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. + (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers) + :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful + to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, + the parsed_content is also considered the source_content (which affects what annotations are allowed in it). + Note you should never pass the parsed_content as source_content directly. + :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you + should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and + can keep this False). + :param inline_reference_documentation: Optionally, programmatic documentation + :return: An input parser capable of reading input matching the TypedDict(s) used as reference. + """ + if not is_typeddict(parsed_content): + raise ValueError( + f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." + ' Only "TypedDict"-based types supported.' + ) + if source_content is parsed_content: + raise ValueError( + "Do not provide source_content if it is the same as parsed_content" + ) + + target_attributes = self._parse_types( + parsed_content, + allow_source_attribute_annotations=source_content is None, + forbid_optional=not allow_optional, + ) + required_target_parameters = frozenset(parsed_content.__required_keys__) + parsed_alt_form = None + non_mapping_source_only = False + + if source_content is not None: + default_target_attribute = None + if len(required_target_parameters) == 1: + default_target_attribute = next(iter(required_target_parameters)) + + source_typed_dict, alt_source_forms = _extract_typed_dict( + source_content, + default_target_attribute, + ) + if alt_source_forms: + parsed_alt_form = self._parse_alt_form( + alt_source_forms, + default_target_attribute, + ) + if source_typed_dict is not None: + source_content_attributes = self._parse_types( + source_typed_dict, + allow_target_attribute_annotation=True, + allow_source_attribute_annotations=True, + forbid_optional=not allow_optional, + ) + source_content_parameter = "source_content" + source_and_parsed_differs = True + else: + source_typed_dict = parsed_content + source_content_attributes = target_attributes + source_content_parameter = "parsed_content" + source_and_parsed_differs = True + non_mapping_source_only = True + else: + source_typed_dict = parsed_content + source_content_attributes = target_attributes + source_content_parameter = "parsed_content" + source_and_parsed_differs = False + + sources = collections.defaultdict(set) + seen_targets = set() + seen_source_names: Dict[str, str] = {} + source_attributes: Dict[str, AttributeDescription] = {} + path_hint_source_attributes = [] + + for k in source_content_attributes: + ia = source_content_attributes[k] + + ta = ( + target_attributes.get(ia.target_attribute) + if source_and_parsed_differs + else ia + ) + if ta is None: + # Error message would be wrong if this assertion is false. + assert source_and_parsed_differs + raise ValueError( + f'The attribute "{k}" from the "source_content" parameter should have mapped' + f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' + ) + if _is_path_attribute_candidate(ia, ta): + path_hint_source_attributes.append(ia.source_attribute_name) + existing_source_name = seen_source_names.get(ia.source_attribute_name) + if existing_source_name: + raise ValueError( + f'The attribute "{k}" and "{existing_source_name}" both share the source name' + f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' + f' so only one attribute use "{ia.source_attribute_name}".' + ) + seen_source_names[ia.source_attribute_name] = k + seen_targets.add(ta.target_attribute) + sources[ia.target_attribute].add(k) + if source_and_parsed_differs: + bridge_mapper = self._type_normalize( + k, ia.attribute_type, ta.attribute_type, False + ) + ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) + source_attributes[k] = ia + + def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]: + return frozenset( + source_content_attributes[a].source_attribute_name for a in td_name + ) + + _check_attributes( + parsed_content, + source_typed_dict, + source_content_attributes, + sources, + ) + + at_least_one_of = frozenset( + _as_attr_names(g) + for k, g in sources.items() + if len(g) > 1 and k in required_target_parameters + ) + + if source_and_parsed_differs and seen_targets != target_attributes.keys(): + missing = ", ".join( + repr(k) for k in (target_attributes.keys() - seen_targets) + ) + raise ValueError( + 'The following attributes in "parsed_content" did not have a source field in "source_content":' + f" {missing}" + ) + all_mutually_exclusive_fields = frozenset( + _as_attr_names(g) for g in sources.values() if len(g) > 1 + ) + + all_parameters = ( + source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ + ) + _check_conflicts( + source_content_attributes, + source_typed_dict.__required_keys__, + all_parameters, + ) + + manifest_attributes = { + a.source_attribute_name: a for a in source_content_attributes.values() + } + + if parsed_alt_form is not None: + target_attribute = parsed_alt_form.target_attribute + if ( + target_attribute not in required_target_parameters + and required_target_parameters + or len(required_target_parameters) > 1 + ): + raise NotImplementedError( + "When using alternative source formats (Union[TypedDict, ...]), then the" + " target must have at most one require parameter" + ) + bridge_mapper = self._type_normalize( + target_attribute, + parsed_alt_form.attribute_type, + target_attributes[target_attribute].attribute_type, + False, + ) + parsed_alt_form.type_validator = ( + parsed_alt_form.type_validator.combine_mapper(bridge_mapper) + ) + + _verify_inline_reference_documentation( + source_content_attributes, + inline_reference_documentation, + parsed_alt_form is not None, + ) + if non_mapping_source_only: + return DeclarativeNonMappingInputParser( + assume_not_none(parsed_alt_form), + inline_reference_documentation=inline_reference_documentation, + ) + else: + return DeclarativeMappingInputParser( + _as_attr_names(source_typed_dict.__required_keys__), + _as_attr_names(all_parameters), + manifest_attributes, + source_attributes, + mutually_exclusive_attributes=all_mutually_exclusive_fields, + alt_form_parser=parsed_alt_form, + at_least_one_of=at_least_one_of, + inline_reference_documentation=inline_reference_documentation, + path_hint_source_attributes=tuple(path_hint_source_attributes), + ) + + def _as_type_validator( + self, + attribute: str, + provided_type: Any, + parsing_typed_dict_attribute: bool, + ) -> AttributeTypeHandler: + assert not isinstance(provided_type, tuple) + + if isinstance(provided_type, type) and issubclass( + provided_type, DebputyDispatchableType + ): + return _dispatch_parser(provided_type) + + unmapped_type = self._strip_mapped_types( + provided_type, + parsing_typed_dict_attribute, + ) + type_normalizer = self._type_normalize( + attribute, + unmapped_type, + provided_type, + parsing_typed_dict_attribute, + ) + t_unmapped, t_orig, t_args = unpack_type( + unmapped_type, + parsing_typed_dict_attribute, + ) + + if ( + t_orig == Union + and t_args + and len(t_args) == 2 + and any(v is _NONE_TYPE for v in t_args) + ): + _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) + actual_type = [a for a in args if a is not _NONE_TYPE][0] + validator = self._as_type_validator( + attribute, actual_type, parsing_typed_dict_attribute + ) + + def _validator(v: Any, path: AttributePath) -> None: + if v is None: + return + validator.ensure_type(v, path) + + return AttributeTypeHandler( + validator.describe_type(), + _validator, + base_type=validator.base_type, + mapper=type_normalizer, + ) + + if unmapped_type in BASIC_SIMPLE_TYPES: + type_name = BASIC_SIMPLE_TYPES[unmapped_type] + + type_mapping = self._registered_types.get(provided_type) + if type_mapping is not None: + simple_type = f" ({type_name})" + type_name = type_mapping.target_type.__name__ + else: + simple_type = "" + + def _validator(v: Any, path: AttributePath) -> None: + if not isinstance(v, unmapped_type): + _validation_type_error( + path, f"The attribute must be a {type_name}{simple_type}" + ) + + return AttributeTypeHandler( + type_name, + _validator, + base_type=unmapped_type, + mapper=type_normalizer, + ) + if t_orig == list: + if not t_args: + raise ValueError( + f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' + ) + _, t_provided_orig, t_provided_args = unpack_type( + provided_type, + parsing_typed_dict_attribute, + ) + genetic_type = t_args[0] + key_mapper = self._as_type_validator( + attribute, + genetic_type, + parsing_typed_dict_attribute, + ) + + def _validator(v: Any, path: AttributePath) -> None: + if not isinstance(v, list): + _validation_type_error(path, "The attribute must be a list") + for i, v in enumerate(v): + key_mapper.ensure_type(v, path[i]) + + list_mapper = ( + map_each_element(key_mapper.mapper) + if key_mapper.mapper is not None + else None + ) + + return AttributeTypeHandler( + f"List of {key_mapper.describe_type()}", + _validator, + base_type=list, + mapper=type_normalizer, + ).combine_mapper(list_mapper) + if is_typeddict(provided_type): + subparser = self.parser_from_typed_dict(cast("Type[TD]", provided_type)) + return AttributeTypeHandler( + description=f"{provided_type.__name__} (Typed Mapping)", + ensure_type=lambda v, ap: None, + base_type=dict, + mapper=lambda v, ap, cv: subparser.parse_input( + v, ap, parser_context=cv + ), + ) + if t_orig == dict: + if not t_args or len(t_args) != 2: + raise ValueError( + f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' + ) + if t_args[0] != str: + raise ValueError( + f'The attribute "{attribute}" is Dict and has a non-str type as key.' + " Currently, only `str` is supported (Dict[str, Y])" + ) + key_mapper = self._as_type_validator( + attribute, + t_args[0], + parsing_typed_dict_attribute, + ) + value_mapper = self._as_type_validator( + attribute, + t_args[1], + parsing_typed_dict_attribute, + ) + + if key_mapper.base_type is None: + raise ValueError( + f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' + f" without trivial base types (such as `str`) are not supported at the moment." + ) + + if value_mapper.mapper is not None: + raise ValueError( + f'The attribute "{attribute}" is Dict and the value requires mapping.' + " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." + " Better typing may come later" + ) + + def _validator(uv: Any, path: AttributePath) -> None: + if not isinstance(uv, dict): + _validation_type_error(path, "The attribute must be a mapping") + key_name = "the first key in the mapping" + for i, (k, v) in enumerate(uv.items()): + if not key_mapper.base_type_match(k): + kp = path.copy_with_path_hint(key_name) + _validation_type_error( + kp, + f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', + ) + key_name = f"the key after {k}" + value_mapper.ensure_type(v, path[k]) + + return AttributeTypeHandler( + f"Mapping of {value_mapper.describe_type()}", + _validator, + base_type=dict, + mapper=type_normalizer, + ).combine_mapper(key_mapper.mapper) + if t_orig == Union: + if _is_two_arg_x_list_x(t_args): + # Force the order to be "X, List[X]" as it simplifies the code + x_list_x = ( + t_args if get_origin(t_args[1]) == list else (t_args[1], t_args[0]) + ) + + # X, List[X] could match if X was List[Y]. However, our code below assumes + # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this + # case to avoid this assert and fall into the "generic case". + assert get_origin(x_list_x[0]) != list + x_subtype_checker = self._as_type_validator( + attribute, + x_list_x[0], + parsing_typed_dict_attribute, + ) + list_x_subtype_checker = self._as_type_validator( + attribute, + x_list_x[1], + parsing_typed_dict_attribute, + ) + type_description = x_subtype_checker.describe_type() + type_description = f"{type_description} or a list of {type_description}" + + def _validator(v: Any, path: AttributePath) -> None: + if isinstance(v, list): + list_x_subtype_checker.ensure_type(v, path) + else: + x_subtype_checker.ensure_type(v, path) + + return AttributeTypeHandler( + type_description, + _validator, + mapper=type_normalizer, + ) + else: + subtype_checker = [ + self._as_type_validator(attribute, a, parsing_typed_dict_attribute) + for a in t_args + ] + type_description = "one-of: " + ", ".join( + f"{sc.describe_type()}" for sc in subtype_checker + ) + mapper = subtype_checker[0].mapper + if any(mapper != sc.mapper for sc in subtype_checker): + raise ValueError( + f'Cannot handle the union "{provided_type}" as the target types need different' + " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" + " where X is a non-collection type." + ) + + def _validator(v: Any, path: AttributePath) -> None: + partial_matches = [] + for sc in subtype_checker: + try: + sc.ensure_type(v, path) + return + except ManifestParseException as e: + if sc.base_type_match(v): + partial_matches.append((sc, e)) + + if len(partial_matches) == 1: + raise partial_matches[0][1] + _validation_type_error( + path, f"Could not match against: {type_description}" + ) + + return AttributeTypeHandler( + type_description, + _validator, + mapper=type_normalizer, + ) + if t_orig == Literal: + # We want "x" for string values; repr provides 'x' + pretty = ", ".join( + f'"{v}"' if isinstance(v, str) else str(v) for v in t_args + ) + + def _validator(v: Any, path: AttributePath) -> None: + if v not in t_args: + value_hint = "" + if isinstance(v, str): + value_hint = f"({v}) " + _validation_type_error( + path, + f"Value {value_hint}must be one of the following literal values: {pretty}", + ) + + return AttributeTypeHandler( + f"One of the following literal values: {pretty}", + _validator, + ) + + if provided_type == Any: + return AttributeTypeHandler( + "any (unvalidated)", + lambda *a: None, + ) + raise ValueError( + f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' + ) + + def _parse_types( + self, + spec: Type[TypedDict], + allow_target_attribute_annotation: bool = False, + allow_source_attribute_annotations: bool = False, + forbid_optional: bool = True, + ) -> Dict[str, AttributeDescription]: + annotations = get_type_hints(spec, include_extras=True) + return { + k: self._attribute_description( + k, + t, + k in spec.__required_keys__, + allow_target_attribute_annotation=allow_target_attribute_annotation, + allow_source_attribute_annotations=allow_source_attribute_annotations, + forbid_optional=forbid_optional, + ) + for k, t in annotations.items() + } + + def _attribute_description( + self, + attribute: str, + orig_td: Any, + is_required: bool, + forbid_optional: bool = True, + allow_target_attribute_annotation: bool = False, + allow_source_attribute_annotations: bool = False, + ) -> AttributeDescription: + td, anno, is_optional = _parse_type( + attribute, orig_td, forbid_optional=forbid_optional + ) + type_validator = self._as_type_validator(attribute, td, True) + parsed_annotations = DetectedDebputyParseHint.parse_annotations( + anno, + f' Seen with attribute "{attribute}".', + attribute, + is_required, + allow_target_attribute_annotation=allow_target_attribute_annotation, + allow_source_attribute_annotations=allow_source_attribute_annotations, + ) + return AttributeDescription( + target_attribute=parsed_annotations.target_attribute, + attribute_type=td, + type_validator=type_validator, + annotations=anno, + is_optional=is_optional, + conflicting_attributes=parsed_annotations.conflict_with_source_attributes, + conditional_required=parsed_annotations.conditional_required, + source_attribute_name=assume_not_none( + parsed_annotations.source_manifest_attribute + ), + parse_hints=parsed_annotations, + ) + + def _parse_alt_form( + self, + alt_form, + default_target_attribute: Optional[str], + ) -> AttributeDescription: + td, anno, is_optional = _parse_type( + "source_format alternative form", + alt_form, + forbid_optional=True, + parsing_typed_dict_attribute=False, + ) + type_validator = self._as_type_validator( + "source_format alternative form", + td, + True, + ) + parsed_annotations = DetectedDebputyParseHint.parse_annotations( + anno, + f" The alternative for source_format.", + None, + False, + default_target_attribute=default_target_attribute, + allow_target_attribute_annotation=True, + allow_source_attribute_annotations=False, + ) + return AttributeDescription( + target_attribute=parsed_annotations.target_attribute, + attribute_type=td, + type_validator=type_validator, + annotations=anno, + is_optional=is_optional, + conflicting_attributes=parsed_annotations.conflict_with_source_attributes, + conditional_required=parsed_annotations.conditional_required, + source_attribute_name="Alt form of the source_format", + ) + + def _union_narrowing( + self, + input_type: Any, + target_type: Any, + parsing_typed_dict_attribute: bool, + ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: + _, input_orig, input_args = unpack_type( + input_type, parsing_typed_dict_attribute + ) + _, target_orig, target_args = unpack_type( + target_type, parsing_typed_dict_attribute + ) + + if input_orig != Union or not input_args: + raise ValueError("input_type must be a Union[...] with non-empty args") + + # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] + # - Where X = Y or there is a simple standard transformation from X to Y. + + if target_orig not in (Union, list) or not target_args: + # Not supported + return None + + if target_orig == Union and set(input_args) == set(target_args): + # Not needed (identity mapping) + return None + + if target_orig == list and not any(get_origin(a) == list for a in input_args): + # Not supported + return None + + target_arg = target_args[0] + simplified_type = self._strip_mapped_types( + target_arg, parsing_typed_dict_attribute + ) + acceptable_types = { + target_arg, + List[target_arg], # type: ignore + simplified_type, + List[simplified_type], # type: ignore + } + target_format = ( + target_arg, + List[target_arg], # type: ignore + ) + in_target_format = 0 + in_simple_format = 0 + for input_arg in input_args: + if input_arg not in acceptable_types: + # Not supported + return None + if input_arg in target_format: + in_target_format += 1 + else: + in_simple_format += 1 + + assert in_simple_format or in_target_format + + if in_target_format and not in_simple_format: + # Union[X, List[X]] -> List[X] + return normalize_into_list + mapped = self._registered_types[target_arg] + if not in_target_format and in_simple_format: + # Union[X, List[X]] -> List[Y] + + def _mapper_x_list_y( + x: Union[Any, List[Any]], + ap: AttributePath, + pc: Optional["ParserContextData"], + ) -> List[Any]: + in_list_form: List[Any] = normalize_into_list(x, ap, pc) + + return [mapped.mapper(x, ap, pc) for x in in_list_form] + + return _mapper_x_list_y + + # Union[Y, List[X]] -> List[Y] + if not isinstance(target_arg, type): + raise ValueError( + f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" + f" not support mixed types. Please use either {simplified_type} or {target_arg}" + f" in the source content (but both a mix of both)" + ) + + def _mapper_mixed_list_y( + x: Union[Any, List[Any]], + ap: AttributePath, + pc: Optional["ParserContextData"], + ) -> List[Any]: + in_list_form: List[Any] = normalize_into_list(x, ap, pc) + + return [ + x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) + for x in in_list_form + ] + + return _mapper_mixed_list_y + + def _type_normalize( + self, + attribute: str, + input_type: Any, + target_type: Any, + parsing_typed_dict_attribute: bool, + ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: + if input_type == target_type: + return None + _, input_orig, input_args = unpack_type( + input_type, parsing_typed_dict_attribute + ) + _, target_orig, target_args = unpack_type( + target_type, + parsing_typed_dict_attribute, + ) + if input_orig == Union: + result = self._union_narrowing( + input_type, target_type, parsing_typed_dict_attribute + ) + if result: + return result + elif target_orig == list and target_args[0] == input_type: + return wrap_into_list + + mapped = self._registered_types.get(target_type) + if mapped is not None and input_type == mapped.source_type: + # Source -> Target + return mapped.mapper + if target_orig == list and target_args: + mapped = self._registered_types.get(target_args[0]) + if mapped is not None: + # mypy is dense and forgots `mapped` cannot be optional in the comprehensions. + mapped_type: TypeMapping = mapped + if input_type == mapped.source_type: + # Source -> List[Target] + return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] + if ( + input_orig == list + and input_args + and input_args[0] == mapped_type.source_type + ): + # List[Source] -> List[Target] + return lambda xs, ap, pc: [ + mapped_type.mapper(x, ap, pc) for x in xs + ] + + raise ValueError( + f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' + f" {input_type} to {target_type}" + ) + + def _strip_mapped_types( + self, orig_td: Any, parsing_typed_dict_attribute: bool + ) -> Any: + m = self._registered_types.get(orig_td) + if m is not None: + return m.source_type + _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) + if v == list: + arg = args[0] + m = self._registered_types.get(arg) + if m: + return List[m.source_type] # type: ignore + if v == Union: + stripped_args = tuple( + self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args + ) + if stripped_args != args: + return Union[stripped_args] + return orig_td + + +def _verify_inline_reference_documentation( + source_content_attributes: Mapping[str, AttributeDescription], + inline_reference_documentation: Optional[ParserDocumentation], + has_alt_form: bool, +) -> None: + if inline_reference_documentation is None: + return + attribute_doc = inline_reference_documentation.attribute_doc + if attribute_doc: + seen = set() + for attr_doc in attribute_doc: + for attr_name in attr_doc.attributes: + attr = source_content_attributes.get(attr_name) + if attr is None: + raise ValueError( + f'The inline_reference_documentation references an attribute "{attr_name}", which does not' + f" exist in the source format." + ) + if attr_name in seen: + raise ValueError( + f'The inline_reference_documentation has documentation for "{attr_name}" twice,' + f" which is not supported. Please document it at most once" + ) + seen.add(attr_name) + + undocumented = source_content_attributes.keys() - seen + if undocumented: + undocumented_attrs = ", ".join(undocumented) + raise ValueError( + "The following attributes were not documented. If this is deliberate, then please" + ' declare each them as undocumented (via undocumented_attr("foo")):' + f" {undocumented_attrs}" + ) + + if inline_reference_documentation.alt_parser_description and not has_alt_form: + raise ValueError( + "The inline_reference_documentation had documentation for an non-mapping format," + " but the source format does not have a non-mapping format." + ) + + +def _check_conflicts( + input_content_attributes: Dict[str, AttributeDescription], + required_attributes: FrozenSet[str], + all_attributes: FrozenSet[str], +) -> None: + for attr_name, attr in input_content_attributes.items(): + if attr_name in required_attributes and attr.conflicting_attributes: + c = ", ".join(repr(a) for a in attr.conflicting_attributes) + raise ValueError( + f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' + " This makes it impossible to use these attributes. Either remove the attributes" + f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' + " optional (NotRequired)" + ) + else: + required_conflicts = attr.conflicting_attributes & required_attributes + if required_conflicts: + c = ", ".join(repr(a) for a in required_conflicts) + raise ValueError( + f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' + f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' + f" adjust the conflicts or make the listed attributes optional (NotRequired)" + ) + unknown_attributes = attr.conflicting_attributes - all_attributes + if unknown_attributes: + c = ", ".join(repr(a) for a in unknown_attributes) + raise ValueError( + f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' + f" None of these attributes were declared in the input." + ) + + +def _check_attributes( + content: Type[TypedDict], + input_content: Type[TypedDict], + input_content_attributes: Dict[str, AttributeDescription], + sources: Mapping[str, Collection[str]], +) -> None: + target_required_keys = content.__required_keys__ + input_required_keys = input_content.__required_keys__ + all_input_keys = input_required_keys | input_content.__optional_keys__ + + for input_name in all_input_keys: + attr = input_content_attributes[input_name] + target_name = attr.target_attribute + source_names = sources[target_name] + input_is_required = input_name in input_required_keys + target_is_required = target_name in target_required_keys + + assert source_names + + if input_is_required and len(source_names) > 1: + raise ValueError( + f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' + f' which has multiple sources "{source_names}". If "{input_name}" should be required,' + f' then there is no need for additional sources for "{target_name}". Alternatively,' + f' "{input_name}" might be missing a NotRequired type' + f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' + ) + if not input_is_required and target_is_required and len(source_names) == 1: + raise ValueError( + f'The source attribute "{input_name}" is not marked as required and maps to' + f' "{target_name}", which is marked as required. As there are no other attributes' + f' mapping to "{target_name}", then "{input_name}" must be required as well' + f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' + f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' + ) + + +def _validation_type_error(path: AttributePath, message: str) -> None: + raise ManifestParseException( + f'The attribute "{path.path}" did not have a valid structure/type: {message}' + ) + + +def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool: + if len(t_args) != 2: + return False + lhs, rhs = t_args + if get_origin(lhs) == list: + if get_origin(rhs) == list: + # It could still match X, List[X] - but we do not allow this case for now as the caller + # does not support it. + return False + l_args = get_args(lhs) + return bool(l_args and l_args[0] == rhs) + if get_origin(rhs) == list: + r_args = get_args(rhs) + return bool(r_args and r_args[0] == lhs) + return False + + +def _extract_typed_dict( + base_type, + default_target_attribute: Optional[str], +) -> Tuple[Optional[Type[TypedDict]], Any]: + if is_typeddict(base_type): + return base_type, None + _, origin, args = unpack_type(base_type, False) + if origin != Union: + if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): + raise ValueError( + "The source_format cannot be nor contain a (non-TypedDict) dict" + ) + return None, base_type + typed_dicts = [x for x in args if is_typeddict(x)] + if len(typed_dicts) > 1: + raise ValueError( + "When source_format is a Union, it must contain at most one TypedDict" + ) + typed_dict = typed_dicts[0] if typed_dicts else None + + if any(x is None or x is _NONE_TYPE for x in args): + raise ValueError( + "The source_format cannot be nor contain Optional[X] or Union[X, None]" + ) + + if any( + isinstance(x, type) and issubclass(x, (dict, Mapping)) + for x in args + if x is not typed_dict + ): + raise ValueError( + "The source_format cannot be nor contain a (non-TypedDict) dict" + ) + remaining = [x for x in args if x is not typed_dict] + has_target_attribute = False + anno = None + if len(remaining) == 1: + base_type, anno, _ = _parse_type( + "source_format alternative form", + remaining[0], + forbid_optional=True, + parsing_typed_dict_attribute=False, + ) + has_target_attribute = bool(anno) and any( + isinstance(x, TargetAttribute) for x in anno + ) + target_type = base_type + else: + target_type = Union[tuple(remaining)] + + if default_target_attribute is None and not has_target_attribute: + raise ValueError( + 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' + " OR the parsed_content format must have exactly one attribute that is required." + ) + if anno: + final_anno = [target_type] + final_anno.extend(anno) + return typed_dict, Annotated[tuple(final_anno)] + return typed_dict, target_type + + +def _dispatch_parse_generator( + dispatch_type: Type[DebputyDispatchableType], +) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: + def _dispatch_parse( + value: Any, + attribute_path: AttributePath, + parser_context: Optional["ParserContextData"], + ): + assert parser_context is not None + dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) + return dispatching_parser.parse( + value, attribute_path, parser_context=parser_context + ) + + return _dispatch_parse + + +def _dispatch_parser( + dispatch_type: Type[DebputyDispatchableType], +) -> AttributeTypeHandler: + return AttributeTypeHandler( + dispatch_type.__name__, + lambda *a: None, + mapper=_dispatch_parse_generator(dispatch_type), + ) + + +def _parse_type( + attribute: str, + orig_td: Any, + forbid_optional: bool = True, + parsing_typed_dict_attribute: bool = True, +) -> Tuple[Any, Tuple[Any, ...], bool]: + td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) + md: Tuple[Any, ...] = tuple() + optional = False + if v is not None: + if v == Annotated: + anno = get_args(td) + md = anno[1:] + td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) + + if td is _NONE_TYPE: + raise ValueError( + f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' + " debputy manifest, so this attribute does not make sense in its current form." + ) + if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): + raise ValueError( + f'Detected use of Optional in "{attribute}", which is not allowed here.' + " Please use NotRequired for optional fields" + ) + + return td, md, optional + + +def _normalize_attribute_name(attribute: str) -> str: + if attribute.endswith("_"): + attribute = attribute[:-1] + return attribute.replace("_", "-") + + +@dataclasses.dataclass +class DetectedDebputyParseHint: + target_attribute: str + source_manifest_attribute: Optional[str] + conflict_with_source_attributes: FrozenSet[str] + conditional_required: Optional[ConditionalRequired] + applicable_as_path_hint: bool + + @classmethod + def parse_annotations( + cls, + anno: Tuple[Any, ...], + error_context: str, + default_attribute_name: Optional[str], + is_required: bool, + default_target_attribute: Optional[str] = None, + allow_target_attribute_annotation: bool = False, + allow_source_attribute_annotations: bool = False, + ) -> "DetectedDebputyParseHint": + target_attr_anno = find_annotation(anno, TargetAttribute) + if target_attr_anno: + if not allow_target_attribute_annotation: + raise ValueError( + f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" + ) + target_attribute = target_attr_anno.attribute + elif default_target_attribute is not None: + target_attribute = default_target_attribute + elif default_attribute_name is not None: + target_attribute = default_attribute_name + else: + if default_attribute_name is None: + raise ValueError( + "allow_target_attribute_annotation must be True OR " + "default_attribute_name/default_target_attribute must be not None" + ) + raise ValueError( + f"Missing DebputyParseHint.target_attribute annotation.{error_context}" + ) + source_attribute_anno = find_annotation(anno, ManifestAttribute) + _source_attribute_allowed( + allow_source_attribute_annotations, error_context, source_attribute_anno + ) + if source_attribute_anno: + source_attribute_name = source_attribute_anno.attribute + elif default_attribute_name is not None: + source_attribute_name = _normalize_attribute_name(default_attribute_name) + else: + source_attribute_name = None + mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) + if mutual_exclusive_with_anno: + _source_attribute_allowed( + allow_source_attribute_annotations, + error_context, + mutual_exclusive_with_anno, + ) + conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes + else: + conflicting_attributes = frozenset() + conditional_required = find_annotation(anno, ConditionalRequired) + + if conditional_required and is_required: + if default_attribute_name is None: + raise ValueError( + f"is_required cannot be True without default_attribute_name being not None" + ) + raise ValueError( + f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' + ' Please make the attribute "NotRequired" or remove the conditional requirement.' + ) + + not_path_hint_anno = find_annotation(anno, NotPathHint) + applicable_as_path_hint = not_path_hint_anno is None + + return DetectedDebputyParseHint( + target_attribute=target_attribute, + source_manifest_attribute=source_attribute_name, + conflict_with_source_attributes=conflicting_attributes, + conditional_required=conditional_required, + applicable_as_path_hint=applicable_as_path_hint, + ) + + +def _source_attribute_allowed( + source_attribute_allowed: bool, + error_context: str, + annotation: Optional[DebputyParseHint], +) -> None: + if source_attribute_allowed or annotation is None: + return + raise ValueError( + f'The annotation "{annotation}" cannot be used here. {error_context}' + ) |