summaryrefslogtreecommitdiffstats
path: root/src/debputy/manifest_parser/declarative_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/manifest_parser/declarative_parser.py')
-rw-r--r--src/debputy/manifest_parser/declarative_parser.py1893
1 files changed, 1893 insertions, 0 deletions
diff --git a/src/debputy/manifest_parser/declarative_parser.py b/src/debputy/manifest_parser/declarative_parser.py
new file mode 100644
index 0000000..32e93fe
--- /dev/null
+++ b/src/debputy/manifest_parser/declarative_parser.py
@@ -0,0 +1,1893 @@
+import collections
+import dataclasses
+import itertools
+from typing import (
+ Any,
+ Callable,
+ Tuple,
+ TypedDict,
+ Dict,
+ get_type_hints,
+ Annotated,
+ get_args,
+ get_origin,
+ TypeVar,
+ Generic,
+ FrozenSet,
+ Mapping,
+ Optional,
+ cast,
+ is_typeddict,
+ Type,
+ Union,
+ List,
+ Collection,
+ NotRequired,
+ Iterable,
+ Literal,
+ Sequence,
+)
+
+from debputy.manifest_parser.base_types import (
+ DebputyParsedContent,
+ StaticFileSystemOwner,
+ StaticFileSystemGroup,
+ FileSystemMode,
+ OctalMode,
+ SymlinkTarget,
+ FileSystemMatchRule,
+ FileSystemExactMatchRule,
+ FileSystemExactNonDirMatchRule,
+ DebputyDispatchableType,
+ TypeMapping,
+)
+from debputy.manifest_parser.exceptions import (
+ ManifestParseException,
+)
+from debputy.manifest_parser.mapper_code import (
+ type_mapper_str2package,
+ normalize_into_list,
+ wrap_into_list,
+ map_each_element,
+)
+from debputy.manifest_parser.parser_data import ParserContextData
+from debputy.manifest_parser.util import AttributePath, unpack_type, find_annotation
+from debputy.packages import BinaryPackage
+from debputy.plugin.api.impl_types import (
+ DeclarativeInputParser,
+ TD,
+ _ALL_PACKAGE_TYPES,
+ resolve_package_type_selectors,
+)
+from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector
+from debputy.util import _info, _warn, assume_not_none
+
+try:
+ from Levenshtein import distance
+except ImportError:
+ _WARN_ONCE = False
+
+ def _detect_possible_typo(
+ _key: str,
+ _value: object,
+ _manifest_attributes: Mapping[str, "AttributeDescription"],
+ _path: "AttributePath",
+ ) -> None:
+ global _WARN_ONCE
+ if not _WARN_ONCE:
+ _WARN_ONCE = True
+ _info(
+ "Install python3-levenshtein to have debputy try to detect typos in the manifest."
+ )
+
+else:
+
+ def _detect_possible_typo(
+ key: str,
+ value: object,
+ manifest_attributes: Mapping[str, "AttributeDescription"],
+ path: "AttributePath",
+ ) -> None:
+ k_len = len(key)
+ key_path = path[key]
+ matches: List[str] = []
+ current_match_strength = 0
+ for acceptable_key, attr in manifest_attributes.items():
+ if abs(k_len - len(acceptable_key)) > 2:
+ continue
+ d = distance(key, acceptable_key)
+ if d > 2:
+ continue
+ try:
+ attr.type_validator.ensure_type(value, key_path)
+ except ManifestParseException:
+ if attr.type_validator.base_type_match(value):
+ match_strength = 1
+ else:
+ match_strength = 0
+ else:
+ match_strength = 2
+
+ if match_strength < current_match_strength:
+ continue
+ if match_strength > current_match_strength:
+ current_match_strength = match_strength
+ matches.clear()
+ matches.append(acceptable_key)
+
+ if not matches:
+ return
+ ref = f'at "{path.path}"' if path else "at the manifest root level"
+ if len(matches) == 1:
+ possible_match = repr(matches[0])
+ _warn(
+ f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
+ )
+ else:
+ matches.sort()
+ possible_matches = ", ".join(repr(a) for a in matches)
+ _warn(
+ f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
+ )
+
+
+SF = TypeVar("SF")
+T = TypeVar("T")
+S = TypeVar("S")
+
+
+_NONE_TYPE = type(None)
+
+
+# These must be able to appear in an "isinstance" check and must be builtin types.
+BASIC_SIMPLE_TYPES = {
+ str: "string",
+ int: "integer",
+ bool: "boolean",
+}
+
+
+class AttributeTypeHandler:
+ __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
+
+ def __init__(
+ self,
+ description: str,
+ ensure_type: Callable[[Any, AttributePath], None],
+ *,
+ base_type: Optional[Type[Any]] = None,
+ mapper: Optional[
+ Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
+ ] = None,
+ ) -> None:
+ self._description = description
+ self._ensure_type = ensure_type
+ self.base_type = base_type
+ self.mapper = mapper
+
+ def describe_type(self) -> str:
+ return self._description
+
+ def ensure_type(self, obj: object, path: AttributePath) -> None:
+ self._ensure_type(obj, path)
+
+ def base_type_match(self, obj: object) -> bool:
+ base_type = self.base_type
+ return base_type is not None and isinstance(obj, base_type)
+
+ def map_type(
+ self,
+ value: Any,
+ path: AttributePath,
+ parser_context: Optional["ParserContextData"],
+ ) -> Any:
+ mapper = self.mapper
+ if mapper is not None:
+ return mapper(value, path, parser_context)
+ return value
+
+ def combine_mapper(
+ self,
+ mapper: Optional[
+ Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
+ ],
+ ) -> "AttributeTypeHandler":
+ if mapper is None:
+ return self
+ if self.mapper is not None:
+ m = self.mapper
+
+ def _combined_mapper(
+ value: Any,
+ path: AttributePath,
+ parser_context: Optional["ParserContextData"],
+ ) -> Any:
+ return mapper(m(value, path, parser_context), path, parser_context)
+
+ else:
+ _combined_mapper = mapper
+
+ return AttributeTypeHandler(
+ self._description,
+ self._ensure_type,
+ base_type=self.base_type,
+ mapper=_combined_mapper,
+ )
+
+
+@dataclasses.dataclass(slots=True)
+class AttributeDescription:
+ source_attribute_name: str
+ target_attribute: str
+ attribute_type: Any
+ type_validator: AttributeTypeHandler
+ annotations: Tuple[Any, ...]
+ conflicting_attributes: FrozenSet[str]
+ conditional_required: Optional["ConditionalRequired"]
+ parse_hints: Optional["DetectedDebputyParseHint"] = None
+ is_optional: bool = False
+
+
+def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
+ if attribute_path.path_hint is not None:
+ return True
+ if isinstance(v, str):
+ attribute_path.path_hint = v
+ return True
+ elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
+ attribute_path.path_hint = v[0]
+ return True
+ return False
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
+ alt_form_parser: AttributeDescription
+ inline_reference_documentation: Optional[ParserDocumentation] = None
+
+ def parse_input(
+ self,
+ value: object,
+ path: AttributePath,
+ *,
+ parser_context: Optional["ParserContextData"] = None,
+ ) -> TD:
+ if self.reference_documentation_url is not None:
+ doc_ref = f" (Documentation: {self.reference_documentation_url})"
+ else:
+ doc_ref = ""
+
+ alt_form_parser = self.alt_form_parser
+ if value is None:
+ form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
+ if self.reference_documentation_url is not None:
+ doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
+ raise ManifestParseException(
+ f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
+ )
+ _extract_path_hint(value, path)
+ alt_form_parser.type_validator.ensure_type(value, path)
+ attribute = alt_form_parser.target_attribute
+ alias_mapping = {
+ attribute: ("", None),
+ }
+ v = alt_form_parser.type_validator.map_type(value, path, parser_context)
+ path.alias_mapping = alias_mapping
+ return cast("TD", {attribute: v})
+
+
+@dataclasses.dataclass(slots=True)
+class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
+ input_time_required_parameters: FrozenSet[str]
+ all_parameters: FrozenSet[str]
+ manifest_attributes: Mapping[str, "AttributeDescription"]
+ source_attributes: Mapping[str, "AttributeDescription"]
+ at_least_one_of: FrozenSet[FrozenSet[str]]
+ alt_form_parser: Optional[AttributeDescription]
+ mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset()
+ _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None
+ inline_reference_documentation: Optional[ParserDocumentation] = None
+ path_hint_source_attributes: Sequence[str] = tuple()
+
+ def parse_input(
+ self,
+ value: object,
+ path: AttributePath,
+ *,
+ parser_context: Optional["ParserContextData"] = None,
+ ) -> TD:
+ if self.reference_documentation_url is not None:
+ doc_ref = f" (Documentation: {self.reference_documentation_url})"
+ else:
+ doc_ref = ""
+ if value is None:
+ form_note = " The attribute must be a mapping."
+ if self.alt_form_parser is not None:
+ form_note = (
+ " The attribute can be a mapping or a non-mapping format"
+ ' (usually, "non-mapping format" means a string or a list of strings).'
+ )
+ if self.reference_documentation_url is not None:
+ doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
+ raise ManifestParseException(
+ f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
+ )
+ if not isinstance(value, dict):
+ alt_form_parser = self.alt_form_parser
+ if alt_form_parser is None:
+ raise ManifestParseException(
+ f"The attribute {path.path} must be a mapping.{doc_ref}"
+ )
+ _extract_path_hint(value, path)
+ alt_form_parser.type_validator.ensure_type(value, path)
+ assert (
+ value is not None
+ ), "The alternative form was None, but the parser should have rejected None earlier."
+ attribute = alt_form_parser.target_attribute
+ alias_mapping = {
+ attribute: ("", None),
+ }
+ v = alt_form_parser.type_validator.map_type(value, path, parser_context)
+ path.alias_mapping = alias_mapping
+ return cast("TD", {attribute: v})
+
+ unknown_keys = value.keys() - self.all_parameters
+ if unknown_keys:
+ for k in unknown_keys:
+ if isinstance(k, str):
+ _detect_possible_typo(k, value[k], self.manifest_attributes, path)
+ unused_keys = self.all_parameters - value.keys()
+ if unused_keys:
+ k = ", ".join(unused_keys)
+ raise ManifestParseException(
+ f'Unknown keys "{unknown_keys}" at {path.path}". Keys that could be used here are: {k}.{doc_ref}'
+ )
+ raise ManifestParseException(
+ f'Unknown keys "{unknown_keys}" at {path.path}". Please remove them.{doc_ref}'
+ )
+ missing_keys = self.input_time_required_parameters - value.keys()
+ if missing_keys:
+ required = ", ".join(repr(k) for k in sorted(missing_keys))
+ raise ManifestParseException(
+ f"The following keys were required but not present at {path.path}: {required}{doc_ref}"
+ )
+ for maybe_required in self.all_parameters - value.keys():
+ attr = self.manifest_attributes[maybe_required]
+ assert attr.conditional_required is None or parser_context is not None
+ if (
+ attr.conditional_required is not None
+ and attr.conditional_required.condition_applies(
+ assume_not_none(parser_context)
+ )
+ ):
+ reason = attr.conditional_required.reason
+ raise ManifestParseException(
+ f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path}. {reason}{doc_ref}'
+ )
+ for keyset in self.at_least_one_of:
+ matched_keys = value.keys() & keyset
+ if not matched_keys:
+ conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
+ raise ManifestParseException(
+ f"At least one of the following keys must be present at {path.path}:"
+ f" {conditionally_required}{doc_ref}"
+ )
+ for group in self.mutually_exclusive_attributes:
+ matched = value.keys() & group
+ if len(matched) > 1:
+ ck = ", ".join(repr(k) for k in sorted(matched))
+ raise ManifestParseException(
+ f"Could not parse {path.path}: The following attributes are"
+ f" mutually exclusive: {ck}{doc_ref}"
+ )
+ result = {}
+ per_attribute_conflicts = self._per_attribute_conflicts()
+ alias_mapping = {}
+ for path_hint_source_attributes in self.path_hint_source_attributes:
+ v = value.get(path_hint_source_attributes)
+ if v is not None and _extract_path_hint(v, path):
+ break
+ for k, v in value.items():
+ attr = self.manifest_attributes[k]
+ matched = value.keys() & per_attribute_conflicts[k]
+ if matched:
+ ck = ", ".join(repr(k) for k in sorted(matched))
+ raise ManifestParseException(
+ f'The attribute "{k}" at {path.path} cannot be used with the following'
+ f" attributes: {ck}{doc_ref}"
+ )
+ nk = attr.target_attribute
+ key_path = path[k]
+ attr.type_validator.ensure_type(v, key_path)
+ if v is None:
+ continue
+ if k != nk:
+ alias_mapping[nk] = k, None
+ v = attr.type_validator.map_type(v, key_path, parser_context)
+ result[nk] = v
+ if alias_mapping:
+ path.alias_mapping = alias_mapping
+ return cast("TD", result)
+
+ def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]:
+ conflicts = self._per_attribute_conflicts_cache
+ if conflicts is not None:
+ return conflicts
+ attrs = self.source_attributes
+ conflicts = {
+ a.source_attribute_name: frozenset(
+ attrs[ca].source_attribute_name for ca in a.conflicting_attributes
+ )
+ for a in attrs.values()
+ }
+ self._per_attribute_conflicts_cache = conflicts
+ return self._per_attribute_conflicts_cache
+
+
+class DebputyParseHint:
+ @classmethod
+ def target_attribute(cls, target_attribute: str) -> "DebputyParseHint":
+ """Define this source attribute to have a different target attribute name
+
+ As an example:
+
+ >>> class SourceType(TypedDict):
+ ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")]
+ ... sources: NotRequired[List[str]]
+ >>> class TargetType(TypedDict):
+ ... sources: List[str]
+ >>> pg = ParserGenerator()
+ >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType)
+
+ In this example, the user can provide either `source` or `sources` and the parser will
+ map them to the `sources` attribute in the `TargetType`. Note this example relies on
+ the builtin mapping of `str` to `List[str]` to align the types between `source` (from
+ SourceType) and `sources` (from TargetType).
+
+ The following rules apply:
+
+ * All source attributes that map to the same target attribute will be mutually exclusive
+ (that is, the user cannot give `source` *and* `sources` as input).
+ * When the target attribute is required, the source attributes are conditionally
+ mandatory requiring the user to provide exactly one of them.
+ * When multiple source attributes point to a single target attribute, none of the source
+ attributes can be Required.
+ * The annotation can only be used for the source type specification and the source type
+ specification must be different from the target type specification.
+
+ The `target_attribute` annotation can be used without having multiple source attributes. This
+ can be useful if the source attribute name is not valid as a python variable identifier to
+ rename it to a valid python identifier.
+
+ :param target_attribute: The attribute name in the target content
+ :return: The annotation.
+ """
+ return TargetAttribute(target_attribute)
+
+ @classmethod
+ def conflicts_with_source_attributes(
+ cls,
+ *conflicting_source_attributes: str,
+ ) -> "DebputyParseHint":
+ """Declare a conflict with one or more source attributes
+
+ Example:
+
+ >>> class SourceType(TypedDict):
+ ... source: Annotated[NotRequired[str], DebputyParseHint.target_attribute("sources")]
+ ... sources: NotRequired[List[str]]
+ ... into_dir: NotRequired[str]
+ ... renamed_to: Annotated[
+ ... NotRequired[str],
+ ... DebputyParseHint.conflicts_with_source_attributes("sources", "into_dir")
+ ... ]
+ >>> class TargetType(TypedDict):
+ ... sources: List[str]
+ ... into_dir: NotRequired[str]
+ ... renamed_to: NotRequired[str]
+ >>> pg = ParserGenerator()
+ >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType)
+
+ In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report
+ an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for
+ the input source. That is, it is irrelevant that `sources` and `sourceĀ“ happens to "map" to the same target
+ attribute.
+
+ The following rules apply:
+ * It is not possible for a target attribute to declare conflicts unless the target type spec is reused as
+ source type spec.
+ * All attributes involved in a conflict must be NotRequired. If any of the attributes are Required, then
+ the parser generator will reject the input.
+ * All attributes listed in the conflict must be valid attributes in the source type spec.
+
+ Note you do not have to specify conflicts between two attributes with the same target attribute name. The
+ `target_attribute` annotation will handle that for you.
+
+ :param conflicting_source_attributes: All source attributes that cannot be used with this attribute.
+ :return: The annotation.
+ """
+ if len(conflicting_source_attributes) < 1:
+ raise ValueError(
+ "DebputyParseHint.conflicts_with_source_attributes requires at least one attribute as input"
+ )
+ return ConflictWithSourceAttribute(frozenset(conflicting_source_attributes))
+
+ @classmethod
+ def required_when_single_binary(
+ cls,
+ *,
+ package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES,
+ ) -> "DebputyParseHint":
+ """Declare a source attribute as required when the source package produces exactly one binary package
+
+ The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition
+ can only be used for source attributes.
+ """
+ resolved_package_types = resolve_package_type_selectors(package_type)
+ reason = "The field is required for source packages producing exactly one binary package"
+ if resolved_package_types != _ALL_PACKAGE_TYPES:
+ types = ", ".join(sorted(resolved_package_types))
+ reason += f" of type {types}"
+ return ConditionalRequired(
+ reason,
+ lambda c: len(
+ [
+ p
+ for p in c.binary_packages.values()
+ if p.package_type in package_type
+ ]
+ )
+ == 1,
+ )
+ return ConditionalRequired(
+ reason,
+ lambda c: c.is_single_binary_package,
+ )
+
+ @classmethod
+ def required_when_multi_binary(
+ cls,
+ *,
+ package_type: PackageTypeSelector = _ALL_PACKAGE_TYPES,
+ ) -> "DebputyParseHint":
+ """Declare a source attribute as required when the source package produces two or more binary package
+
+ The attribute in question must always be declared as `NotRequired` in the TypedDict and this condition
+ can only be used for source attributes.
+ """
+ resolved_package_types = resolve_package_type_selectors(package_type)
+ reason = "The field is required for source packages producing two or more binary packages"
+ if resolved_package_types != _ALL_PACKAGE_TYPES:
+ types = ", ".join(sorted(resolved_package_types))
+ reason = (
+ "The field is required for source packages producing not producing exactly one binary packages"
+ f" of type {types}"
+ )
+ return ConditionalRequired(
+ reason,
+ lambda c: len(
+ [
+ p
+ for p in c.binary_packages.values()
+ if p.package_type in package_type
+ ]
+ )
+ != 1,
+ )
+ return ConditionalRequired(
+ reason,
+ lambda c: not c.is_single_binary_package,
+ )
+
+ @classmethod
+ def manifest_attribute(cls, attribute: str) -> "DebputyParseHint":
+ """Declare what the attribute name (as written in the manifest) should be
+
+ By default, debputy will do an attribute normalizing that will take valid python identifiers such
+ as `dest_dir` and remap it to the manifest variant (such as `dest-dir`) automatically. If you have
+ a special case, where this built-in normalization is insufficient or the python name is considerably
+ different from what the user would write in the manifest, you can use this parse hint to set the
+ name that the user would have to write in the manifest for this attribute.
+
+ >>> class SourceType(TypedDict):
+ ... source: List[FileSystemMatchRule]
+ ... # Use "as" in the manifest because "as_" was not pretty enough
+ ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.manifest_attribute("as")]
+
+ In this example, we use the parse hint to use "as" as the name in the manifest, because we cannot
+ use "as" a valid python identifier (it is a keyword). While debputy would map `as_` to `as` for us,
+ we have chosen to use `install_as` as a python identifier.
+ """
+ return ManifestAttribute(attribute)
+
+ @classmethod
+ def not_path_error_hint(cls) -> "DebputyParseHint":
+ """Mark this attribute as not a "path hint" when it comes to reporting errors
+
+ By default, `debputy` will pick up attributes that uses path names (FileSystemMatchRule) as
+ candidates for parse error hints (the little "<Search for: VALUE>" in error messages).
+
+ Most rules only have one active path-based attribute and paths tends to be unique enough
+ that it helps people spot the issue faster. However, in rare cases, you can have multiple
+ attributes that fit the bill. In this case, this hint can be used to "hide" the suboptimal
+ choice. As an example:
+
+ >>> class SourceType(TypedDict):
+ ... source: List[FileSystemMatchRule]
+ ... install_as: Annotated[NotRequired[FileSystemExactMatchRule], DebputyParseHint.not_path_error_hint()]
+
+ In this case, without the hint, `debputy` might pick up `install_as` as the attribute to
+ use as hint for error reporting. However, here we have decided that we never want `install_as`
+ leaving `source` as the only option.
+
+ Generally, this type hint must be placed on the **source** format. Any source attribute matching
+ the parsed format will be ignored.
+
+ Mind the assymmetry: The annotation is placed in the **source** format while `debputy` looks at
+ the type of the target attribute to determine if it counts as path.
+ """
+ return NOT_PATH_HINT
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class TargetAttribute(DebputyParseHint):
+ attribute: str
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class ConflictWithSourceAttribute(DebputyParseHint):
+ conflicting_attributes: FrozenSet[str]
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class ConditionalRequired(DebputyParseHint):
+ reason: str
+ condition: Callable[["ParserContextData"], bool]
+
+ def condition_applies(self, context: "ParserContextData") -> bool:
+ return self.condition(context)
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class ManifestAttribute(DebputyParseHint):
+ attribute: str
+
+
+class NotPathHint(DebputyParseHint):
+ pass
+
+
+NOT_PATH_HINT = NotPathHint()
+
+
+def _is_path_attribute_candidate(
+ source_attribute: AttributeDescription, target_attribute: AttributeDescription
+) -> bool:
+ if (
+ source_attribute.parse_hints
+ and not source_attribute.parse_hints.applicable_as_path_hint
+ ):
+ return False
+ target_type = target_attribute.attribute_type
+ _, origin, args = unpack_type(target_type, False)
+ match_type = target_type
+ if origin == list:
+ match_type = args[0]
+ return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
+
+
+class ParserGenerator:
+ def __init__(self) -> None:
+ self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {}
+
+ def register_mapped_type(self, mapped_type: TypeMapping) -> None:
+ existing = self._registered_types.get(mapped_type.target_type)
+ if existing is not None:
+ raise ValueError(f"The type {existing} is already registered")
+ self._registered_types[mapped_type.target_type] = mapped_type
+
+ def discard_mapped_type(self, mapped_type: Type[T]) -> None:
+ del self._registered_types[mapped_type]
+
+ def parser_from_typed_dict(
+ self,
+ parsed_content: Type[TD],
+ *,
+ source_content: Optional[SF] = None,
+ allow_optional: bool = False,
+ inline_reference_documentation: Optional[ParserDocumentation] = None,
+ ) -> DeclarativeInputParser[TD]:
+ """Derive a parser from a TypedDict
+
+ Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
+ or two that are used as a description.
+
+ In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
+ their types. As an example:
+
+ >>> class InstallDocsRule(DebputyParsedContent):
+ ... sources: List[str]
+ ... into: List[str]
+ >>> pg = ParserGenerator()
+ >>> simple_parser = pg.parser_from_typed_dict(InstallDocsRule)
+
+ This will create a parser that would be able to interpret something like:
+
+ ```yaml
+ install-docs:
+ sources: ["docs/*"]
+ into: ["my-pkg"]
+ ```
+
+ While this is sufficient for programmers, it is a bit ridig for the packager writing the manifest. Therefore,
+ you can also provide a TypedDict descriping the input, enabling more flexibility:
+
+ >>> class InstallDocsRule(DebputyParsedContent):
+ ... sources: List[str]
+ ... into: List[str]
+ >>> class InputDocsRuleInputFormat(TypedDict):
+ ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
+ ... sources: NotRequired[List[str]]
+ ... into: Union[str, List[str]]
+ >>> pg = ParserGenerator()
+ >>> flexible_parser = pg.parser_from_typed_dict(
+ ... InstallDocsRule,
+ ... source_content=InputDocsRuleInputFormat,
+ ... )
+
+ In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
+ or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
+ is used to ensure the input is not ambigious. For the `into` parameter, the parser will accept it being a str
+ or a list of strings. Regardless of how the input was provided, the parser will normalize the input such that
+ both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
+ both the previous input but also the following input:
+
+ ```yaml
+ install-docs:
+ source: "docs/*"
+ into: "my-pkg"
+ ```
+
+ The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
+ with a single string in them. As noted above, the name of the `source` attribute will also be normalized
+ while parsing.
+
+ In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
+ as part of the input. Example:
+
+ >>> class DiscardRule(DebputyParsedContent):
+ ... paths: List[str]
+ >>> class DiscardRuleInputDictFormat(TypedDict):
+ ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
+ ... paths: NotRequired[List[str]]
+ >>> # This format relies on DiscardRule having exactly one Required attribute
+ >>> DiscardRuleInputWithAltFormat = Union[
+ ... DiscardRuleInputDictFormat,
+ ... str,
+ ... List[str],
+ ... ]
+ >>> pg = ParserGenerator()
+ >>> flexible_parser = pg.parser_from_typed_dict(
+ ... DiscardRule,
+ ... source_content=DiscardRuleInputWithAltFormat,
+ ... )
+
+
+ Supported types:
+ * `List` - must have a fixed type argument (such as `List[str]`)
+ * `str`
+ * `int`
+ * `BinaryPackage` - When provided (or required), the user must provide a package name listed
+ in the debian/control file. The code receives the BinaryPackage instance
+ matching that input.
+ * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
+ format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
+ * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
+ available statically on all Debian systems (must be in `base-passwd`).
+ The user has multiple options for how to specify it (either via name or id).
+ * `FileSystemGroup` - When provided (or required), the user must a file system group that is
+ available statically on all Debian systems (must be in `base-passwd`).
+ The user has multiple options for how to specify it (either via name or id).
+ * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
+ Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
+ provides the `debputy' default `when` parameter for conditionals.
+
+ Supported special type-like parameters:
+
+ * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
+ outermost level. Cannot vary between `parsed_content` and `source_content`.
+ * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
+ * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
+ Automapping (see below) is restricted to two members in the Union.
+
+ Notable non-supported types:
+ * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
+ * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
+
+ Automatic mapping rules from `source_content` to `parsed_content`:
+ - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
+ `lambda value: value if isinstance(value, list) else [value]`
+ - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
+
+ Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
+ for concrete features that may be useful to you.
+
+ :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
+ (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers)
+ :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
+ to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
+ the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
+ Note you should never pass the parsed_content as source_content directly.
+ :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
+ should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
+ can keep this False).
+ :param inline_reference_documentation: Optionally, programmatic documentation
+ :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
+ """
+ if not is_typeddict(parsed_content):
+ raise ValueError(
+ f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
+ ' Only "TypedDict"-based types supported.'
+ )
+ if source_content is parsed_content:
+ raise ValueError(
+ "Do not provide source_content if it is the same as parsed_content"
+ )
+
+ target_attributes = self._parse_types(
+ parsed_content,
+ allow_source_attribute_annotations=source_content is None,
+ forbid_optional=not allow_optional,
+ )
+ required_target_parameters = frozenset(parsed_content.__required_keys__)
+ parsed_alt_form = None
+ non_mapping_source_only = False
+
+ if source_content is not None:
+ default_target_attribute = None
+ if len(required_target_parameters) == 1:
+ default_target_attribute = next(iter(required_target_parameters))
+
+ source_typed_dict, alt_source_forms = _extract_typed_dict(
+ source_content,
+ default_target_attribute,
+ )
+ if alt_source_forms:
+ parsed_alt_form = self._parse_alt_form(
+ alt_source_forms,
+ default_target_attribute,
+ )
+ if source_typed_dict is not None:
+ source_content_attributes = self._parse_types(
+ source_typed_dict,
+ allow_target_attribute_annotation=True,
+ allow_source_attribute_annotations=True,
+ forbid_optional=not allow_optional,
+ )
+ source_content_parameter = "source_content"
+ source_and_parsed_differs = True
+ else:
+ source_typed_dict = parsed_content
+ source_content_attributes = target_attributes
+ source_content_parameter = "parsed_content"
+ source_and_parsed_differs = True
+ non_mapping_source_only = True
+ else:
+ source_typed_dict = parsed_content
+ source_content_attributes = target_attributes
+ source_content_parameter = "parsed_content"
+ source_and_parsed_differs = False
+
+ sources = collections.defaultdict(set)
+ seen_targets = set()
+ seen_source_names: Dict[str, str] = {}
+ source_attributes: Dict[str, AttributeDescription] = {}
+ path_hint_source_attributes = []
+
+ for k in source_content_attributes:
+ ia = source_content_attributes[k]
+
+ ta = (
+ target_attributes.get(ia.target_attribute)
+ if source_and_parsed_differs
+ else ia
+ )
+ if ta is None:
+ # Error message would be wrong if this assertion is false.
+ assert source_and_parsed_differs
+ raise ValueError(
+ f'The attribute "{k}" from the "source_content" parameter should have mapped'
+ f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
+ )
+ if _is_path_attribute_candidate(ia, ta):
+ path_hint_source_attributes.append(ia.source_attribute_name)
+ existing_source_name = seen_source_names.get(ia.source_attribute_name)
+ if existing_source_name:
+ raise ValueError(
+ f'The attribute "{k}" and "{existing_source_name}" both share the source name'
+ f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
+ f' so only one attribute use "{ia.source_attribute_name}".'
+ )
+ seen_source_names[ia.source_attribute_name] = k
+ seen_targets.add(ta.target_attribute)
+ sources[ia.target_attribute].add(k)
+ if source_and_parsed_differs:
+ bridge_mapper = self._type_normalize(
+ k, ia.attribute_type, ta.attribute_type, False
+ )
+ ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
+ source_attributes[k] = ia
+
+ def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]:
+ return frozenset(
+ source_content_attributes[a].source_attribute_name for a in td_name
+ )
+
+ _check_attributes(
+ parsed_content,
+ source_typed_dict,
+ source_content_attributes,
+ sources,
+ )
+
+ at_least_one_of = frozenset(
+ _as_attr_names(g)
+ for k, g in sources.items()
+ if len(g) > 1 and k in required_target_parameters
+ )
+
+ if source_and_parsed_differs and seen_targets != target_attributes.keys():
+ missing = ", ".join(
+ repr(k) for k in (target_attributes.keys() - seen_targets)
+ )
+ raise ValueError(
+ 'The following attributes in "parsed_content" did not have a source field in "source_content":'
+ f" {missing}"
+ )
+ all_mutually_exclusive_fields = frozenset(
+ _as_attr_names(g) for g in sources.values() if len(g) > 1
+ )
+
+ all_parameters = (
+ source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
+ )
+ _check_conflicts(
+ source_content_attributes,
+ source_typed_dict.__required_keys__,
+ all_parameters,
+ )
+
+ manifest_attributes = {
+ a.source_attribute_name: a for a in source_content_attributes.values()
+ }
+
+ if parsed_alt_form is not None:
+ target_attribute = parsed_alt_form.target_attribute
+ if (
+ target_attribute not in required_target_parameters
+ and required_target_parameters
+ or len(required_target_parameters) > 1
+ ):
+ raise NotImplementedError(
+ "When using alternative source formats (Union[TypedDict, ...]), then the"
+ " target must have at most one require parameter"
+ )
+ bridge_mapper = self._type_normalize(
+ target_attribute,
+ parsed_alt_form.attribute_type,
+ target_attributes[target_attribute].attribute_type,
+ False,
+ )
+ parsed_alt_form.type_validator = (
+ parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
+ )
+
+ _verify_inline_reference_documentation(
+ source_content_attributes,
+ inline_reference_documentation,
+ parsed_alt_form is not None,
+ )
+ if non_mapping_source_only:
+ return DeclarativeNonMappingInputParser(
+ assume_not_none(parsed_alt_form),
+ inline_reference_documentation=inline_reference_documentation,
+ )
+ else:
+ return DeclarativeMappingInputParser(
+ _as_attr_names(source_typed_dict.__required_keys__),
+ _as_attr_names(all_parameters),
+ manifest_attributes,
+ source_attributes,
+ mutually_exclusive_attributes=all_mutually_exclusive_fields,
+ alt_form_parser=parsed_alt_form,
+ at_least_one_of=at_least_one_of,
+ inline_reference_documentation=inline_reference_documentation,
+ path_hint_source_attributes=tuple(path_hint_source_attributes),
+ )
+
+ def _as_type_validator(
+ self,
+ attribute: str,
+ provided_type: Any,
+ parsing_typed_dict_attribute: bool,
+ ) -> AttributeTypeHandler:
+ assert not isinstance(provided_type, tuple)
+
+ if isinstance(provided_type, type) and issubclass(
+ provided_type, DebputyDispatchableType
+ ):
+ return _dispatch_parser(provided_type)
+
+ unmapped_type = self._strip_mapped_types(
+ provided_type,
+ parsing_typed_dict_attribute,
+ )
+ type_normalizer = self._type_normalize(
+ attribute,
+ unmapped_type,
+ provided_type,
+ parsing_typed_dict_attribute,
+ )
+ t_unmapped, t_orig, t_args = unpack_type(
+ unmapped_type,
+ parsing_typed_dict_attribute,
+ )
+
+ if (
+ t_orig == Union
+ and t_args
+ and len(t_args) == 2
+ and any(v is _NONE_TYPE for v in t_args)
+ ):
+ _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
+ actual_type = [a for a in args if a is not _NONE_TYPE][0]
+ validator = self._as_type_validator(
+ attribute, actual_type, parsing_typed_dict_attribute
+ )
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ if v is None:
+ return
+ validator.ensure_type(v, path)
+
+ return AttributeTypeHandler(
+ validator.describe_type(),
+ _validator,
+ base_type=validator.base_type,
+ mapper=type_normalizer,
+ )
+
+ if unmapped_type in BASIC_SIMPLE_TYPES:
+ type_name = BASIC_SIMPLE_TYPES[unmapped_type]
+
+ type_mapping = self._registered_types.get(provided_type)
+ if type_mapping is not None:
+ simple_type = f" ({type_name})"
+ type_name = type_mapping.target_type.__name__
+ else:
+ simple_type = ""
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ if not isinstance(v, unmapped_type):
+ _validation_type_error(
+ path, f"The attribute must be a {type_name}{simple_type}"
+ )
+
+ return AttributeTypeHandler(
+ type_name,
+ _validator,
+ base_type=unmapped_type,
+ mapper=type_normalizer,
+ )
+ if t_orig == list:
+ if not t_args:
+ raise ValueError(
+ f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
+ )
+ _, t_provided_orig, t_provided_args = unpack_type(
+ provided_type,
+ parsing_typed_dict_attribute,
+ )
+ genetic_type = t_args[0]
+ key_mapper = self._as_type_validator(
+ attribute,
+ genetic_type,
+ parsing_typed_dict_attribute,
+ )
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ if not isinstance(v, list):
+ _validation_type_error(path, "The attribute must be a list")
+ for i, v in enumerate(v):
+ key_mapper.ensure_type(v, path[i])
+
+ list_mapper = (
+ map_each_element(key_mapper.mapper)
+ if key_mapper.mapper is not None
+ else None
+ )
+
+ return AttributeTypeHandler(
+ f"List of {key_mapper.describe_type()}",
+ _validator,
+ base_type=list,
+ mapper=type_normalizer,
+ ).combine_mapper(list_mapper)
+ if is_typeddict(provided_type):
+ subparser = self.parser_from_typed_dict(cast("Type[TD]", provided_type))
+ return AttributeTypeHandler(
+ description=f"{provided_type.__name__} (Typed Mapping)",
+ ensure_type=lambda v, ap: None,
+ base_type=dict,
+ mapper=lambda v, ap, cv: subparser.parse_input(
+ v, ap, parser_context=cv
+ ),
+ )
+ if t_orig == dict:
+ if not t_args or len(t_args) != 2:
+ raise ValueError(
+ f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
+ )
+ if t_args[0] != str:
+ raise ValueError(
+ f'The attribute "{attribute}" is Dict and has a non-str type as key.'
+ " Currently, only `str` is supported (Dict[str, Y])"
+ )
+ key_mapper = self._as_type_validator(
+ attribute,
+ t_args[0],
+ parsing_typed_dict_attribute,
+ )
+ value_mapper = self._as_type_validator(
+ attribute,
+ t_args[1],
+ parsing_typed_dict_attribute,
+ )
+
+ if key_mapper.base_type is None:
+ raise ValueError(
+ f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
+ f" without trivial base types (such as `str`) are not supported at the moment."
+ )
+
+ if value_mapper.mapper is not None:
+ raise ValueError(
+ f'The attribute "{attribute}" is Dict and the value requires mapping.'
+ " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
+ " Better typing may come later"
+ )
+
+ def _validator(uv: Any, path: AttributePath) -> None:
+ if not isinstance(uv, dict):
+ _validation_type_error(path, "The attribute must be a mapping")
+ key_name = "the first key in the mapping"
+ for i, (k, v) in enumerate(uv.items()):
+ if not key_mapper.base_type_match(k):
+ kp = path.copy_with_path_hint(key_name)
+ _validation_type_error(
+ kp,
+ f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
+ )
+ key_name = f"the key after {k}"
+ value_mapper.ensure_type(v, path[k])
+
+ return AttributeTypeHandler(
+ f"Mapping of {value_mapper.describe_type()}",
+ _validator,
+ base_type=dict,
+ mapper=type_normalizer,
+ ).combine_mapper(key_mapper.mapper)
+ if t_orig == Union:
+ if _is_two_arg_x_list_x(t_args):
+ # Force the order to be "X, List[X]" as it simplifies the code
+ x_list_x = (
+ t_args if get_origin(t_args[1]) == list else (t_args[1], t_args[0])
+ )
+
+ # X, List[X] could match if X was List[Y]. However, our code below assumes
+ # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
+ # case to avoid this assert and fall into the "generic case".
+ assert get_origin(x_list_x[0]) != list
+ x_subtype_checker = self._as_type_validator(
+ attribute,
+ x_list_x[0],
+ parsing_typed_dict_attribute,
+ )
+ list_x_subtype_checker = self._as_type_validator(
+ attribute,
+ x_list_x[1],
+ parsing_typed_dict_attribute,
+ )
+ type_description = x_subtype_checker.describe_type()
+ type_description = f"{type_description} or a list of {type_description}"
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ if isinstance(v, list):
+ list_x_subtype_checker.ensure_type(v, path)
+ else:
+ x_subtype_checker.ensure_type(v, path)
+
+ return AttributeTypeHandler(
+ type_description,
+ _validator,
+ mapper=type_normalizer,
+ )
+ else:
+ subtype_checker = [
+ self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
+ for a in t_args
+ ]
+ type_description = "one-of: " + ", ".join(
+ f"{sc.describe_type()}" for sc in subtype_checker
+ )
+ mapper = subtype_checker[0].mapper
+ if any(mapper != sc.mapper for sc in subtype_checker):
+ raise ValueError(
+ f'Cannot handle the union "{provided_type}" as the target types need different'
+ " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
+ " where X is a non-collection type."
+ )
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ partial_matches = []
+ for sc in subtype_checker:
+ try:
+ sc.ensure_type(v, path)
+ return
+ except ManifestParseException as e:
+ if sc.base_type_match(v):
+ partial_matches.append((sc, e))
+
+ if len(partial_matches) == 1:
+ raise partial_matches[0][1]
+ _validation_type_error(
+ path, f"Could not match against: {type_description}"
+ )
+
+ return AttributeTypeHandler(
+ type_description,
+ _validator,
+ mapper=type_normalizer,
+ )
+ if t_orig == Literal:
+ # We want "x" for string values; repr provides 'x'
+ pretty = ", ".join(
+ f'"{v}"' if isinstance(v, str) else str(v) for v in t_args
+ )
+
+ def _validator(v: Any, path: AttributePath) -> None:
+ if v not in t_args:
+ value_hint = ""
+ if isinstance(v, str):
+ value_hint = f"({v}) "
+ _validation_type_error(
+ path,
+ f"Value {value_hint}must be one of the following literal values: {pretty}",
+ )
+
+ return AttributeTypeHandler(
+ f"One of the following literal values: {pretty}",
+ _validator,
+ )
+
+ if provided_type == Any:
+ return AttributeTypeHandler(
+ "any (unvalidated)",
+ lambda *a: None,
+ )
+ raise ValueError(
+ f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
+ )
+
+ def _parse_types(
+ self,
+ spec: Type[TypedDict],
+ allow_target_attribute_annotation: bool = False,
+ allow_source_attribute_annotations: bool = False,
+ forbid_optional: bool = True,
+ ) -> Dict[str, AttributeDescription]:
+ annotations = get_type_hints(spec, include_extras=True)
+ return {
+ k: self._attribute_description(
+ k,
+ t,
+ k in spec.__required_keys__,
+ allow_target_attribute_annotation=allow_target_attribute_annotation,
+ allow_source_attribute_annotations=allow_source_attribute_annotations,
+ forbid_optional=forbid_optional,
+ )
+ for k, t in annotations.items()
+ }
+
+ def _attribute_description(
+ self,
+ attribute: str,
+ orig_td: Any,
+ is_required: bool,
+ forbid_optional: bool = True,
+ allow_target_attribute_annotation: bool = False,
+ allow_source_attribute_annotations: bool = False,
+ ) -> AttributeDescription:
+ td, anno, is_optional = _parse_type(
+ attribute, orig_td, forbid_optional=forbid_optional
+ )
+ type_validator = self._as_type_validator(attribute, td, True)
+ parsed_annotations = DetectedDebputyParseHint.parse_annotations(
+ anno,
+ f' Seen with attribute "{attribute}".',
+ attribute,
+ is_required,
+ allow_target_attribute_annotation=allow_target_attribute_annotation,
+ allow_source_attribute_annotations=allow_source_attribute_annotations,
+ )
+ return AttributeDescription(
+ target_attribute=parsed_annotations.target_attribute,
+ attribute_type=td,
+ type_validator=type_validator,
+ annotations=anno,
+ is_optional=is_optional,
+ conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
+ conditional_required=parsed_annotations.conditional_required,
+ source_attribute_name=assume_not_none(
+ parsed_annotations.source_manifest_attribute
+ ),
+ parse_hints=parsed_annotations,
+ )
+
+ def _parse_alt_form(
+ self,
+ alt_form,
+ default_target_attribute: Optional[str],
+ ) -> AttributeDescription:
+ td, anno, is_optional = _parse_type(
+ "source_format alternative form",
+ alt_form,
+ forbid_optional=True,
+ parsing_typed_dict_attribute=False,
+ )
+ type_validator = self._as_type_validator(
+ "source_format alternative form",
+ td,
+ True,
+ )
+ parsed_annotations = DetectedDebputyParseHint.parse_annotations(
+ anno,
+ f" The alternative for source_format.",
+ None,
+ False,
+ default_target_attribute=default_target_attribute,
+ allow_target_attribute_annotation=True,
+ allow_source_attribute_annotations=False,
+ )
+ return AttributeDescription(
+ target_attribute=parsed_annotations.target_attribute,
+ attribute_type=td,
+ type_validator=type_validator,
+ annotations=anno,
+ is_optional=is_optional,
+ conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
+ conditional_required=parsed_annotations.conditional_required,
+ source_attribute_name="Alt form of the source_format",
+ )
+
+ def _union_narrowing(
+ self,
+ input_type: Any,
+ target_type: Any,
+ parsing_typed_dict_attribute: bool,
+ ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]:
+ _, input_orig, input_args = unpack_type(
+ input_type, parsing_typed_dict_attribute
+ )
+ _, target_orig, target_args = unpack_type(
+ target_type, parsing_typed_dict_attribute
+ )
+
+ if input_orig != Union or not input_args:
+ raise ValueError("input_type must be a Union[...] with non-empty args")
+
+ # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
+ # - Where X = Y or there is a simple standard transformation from X to Y.
+
+ if target_orig not in (Union, list) or not target_args:
+ # Not supported
+ return None
+
+ if target_orig == Union and set(input_args) == set(target_args):
+ # Not needed (identity mapping)
+ return None
+
+ if target_orig == list and not any(get_origin(a) == list for a in input_args):
+ # Not supported
+ return None
+
+ target_arg = target_args[0]
+ simplified_type = self._strip_mapped_types(
+ target_arg, parsing_typed_dict_attribute
+ )
+ acceptable_types = {
+ target_arg,
+ List[target_arg], # type: ignore
+ simplified_type,
+ List[simplified_type], # type: ignore
+ }
+ target_format = (
+ target_arg,
+ List[target_arg], # type: ignore
+ )
+ in_target_format = 0
+ in_simple_format = 0
+ for input_arg in input_args:
+ if input_arg not in acceptable_types:
+ # Not supported
+ return None
+ if input_arg in target_format:
+ in_target_format += 1
+ else:
+ in_simple_format += 1
+
+ assert in_simple_format or in_target_format
+
+ if in_target_format and not in_simple_format:
+ # Union[X, List[X]] -> List[X]
+ return normalize_into_list
+ mapped = self._registered_types[target_arg]
+ if not in_target_format and in_simple_format:
+ # Union[X, List[X]] -> List[Y]
+
+ def _mapper_x_list_y(
+ x: Union[Any, List[Any]],
+ ap: AttributePath,
+ pc: Optional["ParserContextData"],
+ ) -> List[Any]:
+ in_list_form: List[Any] = normalize_into_list(x, ap, pc)
+
+ return [mapped.mapper(x, ap, pc) for x in in_list_form]
+
+ return _mapper_x_list_y
+
+ # Union[Y, List[X]] -> List[Y]
+ if not isinstance(target_arg, type):
+ raise ValueError(
+ f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
+ f" not support mixed types. Please use either {simplified_type} or {target_arg}"
+ f" in the source content (but both a mix of both)"
+ )
+
+ def _mapper_mixed_list_y(
+ x: Union[Any, List[Any]],
+ ap: AttributePath,
+ pc: Optional["ParserContextData"],
+ ) -> List[Any]:
+ in_list_form: List[Any] = normalize_into_list(x, ap, pc)
+
+ return [
+ x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
+ for x in in_list_form
+ ]
+
+ return _mapper_mixed_list_y
+
+ def _type_normalize(
+ self,
+ attribute: str,
+ input_type: Any,
+ target_type: Any,
+ parsing_typed_dict_attribute: bool,
+ ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]:
+ if input_type == target_type:
+ return None
+ _, input_orig, input_args = unpack_type(
+ input_type, parsing_typed_dict_attribute
+ )
+ _, target_orig, target_args = unpack_type(
+ target_type,
+ parsing_typed_dict_attribute,
+ )
+ if input_orig == Union:
+ result = self._union_narrowing(
+ input_type, target_type, parsing_typed_dict_attribute
+ )
+ if result:
+ return result
+ elif target_orig == list and target_args[0] == input_type:
+ return wrap_into_list
+
+ mapped = self._registered_types.get(target_type)
+ if mapped is not None and input_type == mapped.source_type:
+ # Source -> Target
+ return mapped.mapper
+ if target_orig == list and target_args:
+ mapped = self._registered_types.get(target_args[0])
+ if mapped is not None:
+ # mypy is dense and forgots `mapped` cannot be optional in the comprehensions.
+ mapped_type: TypeMapping = mapped
+ if input_type == mapped.source_type:
+ # Source -> List[Target]
+ return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
+ if (
+ input_orig == list
+ and input_args
+ and input_args[0] == mapped_type.source_type
+ ):
+ # List[Source] -> List[Target]
+ return lambda xs, ap, pc: [
+ mapped_type.mapper(x, ap, pc) for x in xs
+ ]
+
+ raise ValueError(
+ f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
+ f" {input_type} to {target_type}"
+ )
+
+ def _strip_mapped_types(
+ self, orig_td: Any, parsing_typed_dict_attribute: bool
+ ) -> Any:
+ m = self._registered_types.get(orig_td)
+ if m is not None:
+ return m.source_type
+ _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
+ if v == list:
+ arg = args[0]
+ m = self._registered_types.get(arg)
+ if m:
+ return List[m.source_type] # type: ignore
+ if v == Union:
+ stripped_args = tuple(
+ self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
+ )
+ if stripped_args != args:
+ return Union[stripped_args]
+ return orig_td
+
+
+def _verify_inline_reference_documentation(
+ source_content_attributes: Mapping[str, AttributeDescription],
+ inline_reference_documentation: Optional[ParserDocumentation],
+ has_alt_form: bool,
+) -> None:
+ if inline_reference_documentation is None:
+ return
+ attribute_doc = inline_reference_documentation.attribute_doc
+ if attribute_doc:
+ seen = set()
+ for attr_doc in attribute_doc:
+ for attr_name in attr_doc.attributes:
+ attr = source_content_attributes.get(attr_name)
+ if attr is None:
+ raise ValueError(
+ f'The inline_reference_documentation references an attribute "{attr_name}", which does not'
+ f" exist in the source format."
+ )
+ if attr_name in seen:
+ raise ValueError(
+ f'The inline_reference_documentation has documentation for "{attr_name}" twice,'
+ f" which is not supported. Please document it at most once"
+ )
+ seen.add(attr_name)
+
+ undocumented = source_content_attributes.keys() - seen
+ if undocumented:
+ undocumented_attrs = ", ".join(undocumented)
+ raise ValueError(
+ "The following attributes were not documented. If this is deliberate, then please"
+ ' declare each them as undocumented (via undocumented_attr("foo")):'
+ f" {undocumented_attrs}"
+ )
+
+ if inline_reference_documentation.alt_parser_description and not has_alt_form:
+ raise ValueError(
+ "The inline_reference_documentation had documentation for an non-mapping format,"
+ " but the source format does not have a non-mapping format."
+ )
+
+
+def _check_conflicts(
+ input_content_attributes: Dict[str, AttributeDescription],
+ required_attributes: FrozenSet[str],
+ all_attributes: FrozenSet[str],
+) -> None:
+ for attr_name, attr in input_content_attributes.items():
+ if attr_name in required_attributes and attr.conflicting_attributes:
+ c = ", ".join(repr(a) for a in attr.conflicting_attributes)
+ raise ValueError(
+ f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
+ " This makes it impossible to use these attributes. Either remove the attributes"
+ f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
+ " optional (NotRequired)"
+ )
+ else:
+ required_conflicts = attr.conflicting_attributes & required_attributes
+ if required_conflicts:
+ c = ", ".join(repr(a) for a in required_conflicts)
+ raise ValueError(
+ f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
+ f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
+ f" adjust the conflicts or make the listed attributes optional (NotRequired)"
+ )
+ unknown_attributes = attr.conflicting_attributes - all_attributes
+ if unknown_attributes:
+ c = ", ".join(repr(a) for a in unknown_attributes)
+ raise ValueError(
+ f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
+ f" None of these attributes were declared in the input."
+ )
+
+
+def _check_attributes(
+ content: Type[TypedDict],
+ input_content: Type[TypedDict],
+ input_content_attributes: Dict[str, AttributeDescription],
+ sources: Mapping[str, Collection[str]],
+) -> None:
+ target_required_keys = content.__required_keys__
+ input_required_keys = input_content.__required_keys__
+ all_input_keys = input_required_keys | input_content.__optional_keys__
+
+ for input_name in all_input_keys:
+ attr = input_content_attributes[input_name]
+ target_name = attr.target_attribute
+ source_names = sources[target_name]
+ input_is_required = input_name in input_required_keys
+ target_is_required = target_name in target_required_keys
+
+ assert source_names
+
+ if input_is_required and len(source_names) > 1:
+ raise ValueError(
+ f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
+ f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
+ f' then there is no need for additional sources for "{target_name}". Alternatively,'
+ f' "{input_name}" might be missing a NotRequired type'
+ f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
+ )
+ if not input_is_required and target_is_required and len(source_names) == 1:
+ raise ValueError(
+ f'The source attribute "{input_name}" is not marked as required and maps to'
+ f' "{target_name}", which is marked as required. As there are no other attributes'
+ f' mapping to "{target_name}", then "{input_name}" must be required as well'
+ f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
+ f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
+ )
+
+
+def _validation_type_error(path: AttributePath, message: str) -> None:
+ raise ManifestParseException(
+ f'The attribute "{path.path}" did not have a valid structure/type: {message}'
+ )
+
+
+def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool:
+ if len(t_args) != 2:
+ return False
+ lhs, rhs = t_args
+ if get_origin(lhs) == list:
+ if get_origin(rhs) == list:
+ # It could still match X, List[X] - but we do not allow this case for now as the caller
+ # does not support it.
+ return False
+ l_args = get_args(lhs)
+ return bool(l_args and l_args[0] == rhs)
+ if get_origin(rhs) == list:
+ r_args = get_args(rhs)
+ return bool(r_args and r_args[0] == lhs)
+ return False
+
+
+def _extract_typed_dict(
+ base_type,
+ default_target_attribute: Optional[str],
+) -> Tuple[Optional[Type[TypedDict]], Any]:
+ if is_typeddict(base_type):
+ return base_type, None
+ _, origin, args = unpack_type(base_type, False)
+ if origin != Union:
+ if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)):
+ raise ValueError(
+ "The source_format cannot be nor contain a (non-TypedDict) dict"
+ )
+ return None, base_type
+ typed_dicts = [x for x in args if is_typeddict(x)]
+ if len(typed_dicts) > 1:
+ raise ValueError(
+ "When source_format is a Union, it must contain at most one TypedDict"
+ )
+ typed_dict = typed_dicts[0] if typed_dicts else None
+
+ if any(x is None or x is _NONE_TYPE for x in args):
+ raise ValueError(
+ "The source_format cannot be nor contain Optional[X] or Union[X, None]"
+ )
+
+ if any(
+ isinstance(x, type) and issubclass(x, (dict, Mapping))
+ for x in args
+ if x is not typed_dict
+ ):
+ raise ValueError(
+ "The source_format cannot be nor contain a (non-TypedDict) dict"
+ )
+ remaining = [x for x in args if x is not typed_dict]
+ has_target_attribute = False
+ anno = None
+ if len(remaining) == 1:
+ base_type, anno, _ = _parse_type(
+ "source_format alternative form",
+ remaining[0],
+ forbid_optional=True,
+ parsing_typed_dict_attribute=False,
+ )
+ has_target_attribute = bool(anno) and any(
+ isinstance(x, TargetAttribute) for x in anno
+ )
+ target_type = base_type
+ else:
+ target_type = Union[tuple(remaining)]
+
+ if default_target_attribute is None and not has_target_attribute:
+ raise ValueError(
+ 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
+ " OR the parsed_content format must have exactly one attribute that is required."
+ )
+ if anno:
+ final_anno = [target_type]
+ final_anno.extend(anno)
+ return typed_dict, Annotated[tuple(final_anno)]
+ return typed_dict, target_type
+
+
+def _dispatch_parse_generator(
+ dispatch_type: Type[DebputyDispatchableType],
+) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
+ def _dispatch_parse(
+ value: Any,
+ attribute_path: AttributePath,
+ parser_context: Optional["ParserContextData"],
+ ):
+ assert parser_context is not None
+ dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
+ return dispatching_parser.parse(
+ value, attribute_path, parser_context=parser_context
+ )
+
+ return _dispatch_parse
+
+
+def _dispatch_parser(
+ dispatch_type: Type[DebputyDispatchableType],
+) -> AttributeTypeHandler:
+ return AttributeTypeHandler(
+ dispatch_type.__name__,
+ lambda *a: None,
+ mapper=_dispatch_parse_generator(dispatch_type),
+ )
+
+
+def _parse_type(
+ attribute: str,
+ orig_td: Any,
+ forbid_optional: bool = True,
+ parsing_typed_dict_attribute: bool = True,
+) -> Tuple[Any, Tuple[Any, ...], bool]:
+ td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
+ md: Tuple[Any, ...] = tuple()
+ optional = False
+ if v is not None:
+ if v == Annotated:
+ anno = get_args(td)
+ md = anno[1:]
+ td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
+
+ if td is _NONE_TYPE:
+ raise ValueError(
+ f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
+ " debputy manifest, so this attribute does not make sense in its current form."
+ )
+ if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args):
+ raise ValueError(
+ f'Detected use of Optional in "{attribute}", which is not allowed here.'
+ " Please use NotRequired for optional fields"
+ )
+
+ return td, md, optional
+
+
+def _normalize_attribute_name(attribute: str) -> str:
+ if attribute.endswith("_"):
+ attribute = attribute[:-1]
+ return attribute.replace("_", "-")
+
+
+@dataclasses.dataclass
+class DetectedDebputyParseHint:
+ target_attribute: str
+ source_manifest_attribute: Optional[str]
+ conflict_with_source_attributes: FrozenSet[str]
+ conditional_required: Optional[ConditionalRequired]
+ applicable_as_path_hint: bool
+
+ @classmethod
+ def parse_annotations(
+ cls,
+ anno: Tuple[Any, ...],
+ error_context: str,
+ default_attribute_name: Optional[str],
+ is_required: bool,
+ default_target_attribute: Optional[str] = None,
+ allow_target_attribute_annotation: bool = False,
+ allow_source_attribute_annotations: bool = False,
+ ) -> "DetectedDebputyParseHint":
+ target_attr_anno = find_annotation(anno, TargetAttribute)
+ if target_attr_anno:
+ if not allow_target_attribute_annotation:
+ raise ValueError(
+ f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
+ )
+ target_attribute = target_attr_anno.attribute
+ elif default_target_attribute is not None:
+ target_attribute = default_target_attribute
+ elif default_attribute_name is not None:
+ target_attribute = default_attribute_name
+ else:
+ if default_attribute_name is None:
+ raise ValueError(
+ "allow_target_attribute_annotation must be True OR "
+ "default_attribute_name/default_target_attribute must be not None"
+ )
+ raise ValueError(
+ f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
+ )
+ source_attribute_anno = find_annotation(anno, ManifestAttribute)
+ _source_attribute_allowed(
+ allow_source_attribute_annotations, error_context, source_attribute_anno
+ )
+ if source_attribute_anno:
+ source_attribute_name = source_attribute_anno.attribute
+ elif default_attribute_name is not None:
+ source_attribute_name = _normalize_attribute_name(default_attribute_name)
+ else:
+ source_attribute_name = None
+ mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
+ if mutual_exclusive_with_anno:
+ _source_attribute_allowed(
+ allow_source_attribute_annotations,
+ error_context,
+ mutual_exclusive_with_anno,
+ )
+ conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
+ else:
+ conflicting_attributes = frozenset()
+ conditional_required = find_annotation(anno, ConditionalRequired)
+
+ if conditional_required and is_required:
+ if default_attribute_name is None:
+ raise ValueError(
+ f"is_required cannot be True without default_attribute_name being not None"
+ )
+ raise ValueError(
+ f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
+ ' Please make the attribute "NotRequired" or remove the conditional requirement.'
+ )
+
+ not_path_hint_anno = find_annotation(anno, NotPathHint)
+ applicable_as_path_hint = not_path_hint_anno is None
+
+ return DetectedDebputyParseHint(
+ target_attribute=target_attribute,
+ source_manifest_attribute=source_attribute_name,
+ conflict_with_source_attributes=conflicting_attributes,
+ conditional_required=conditional_required,
+ applicable_as_path_hint=applicable_as_path_hint,
+ )
+
+
+def _source_attribute_allowed(
+ source_attribute_allowed: bool,
+ error_context: str,
+ annotation: Optional[DebputyParseHint],
+) -> None:
+ if source_attribute_allowed or annotation is None:
+ return
+ raise ValueError(
+ f'The annotation "{annotation}" cannot be used here. {error_context}'
+ )