diff options
Diffstat (limited to 'src/debputy/highlevel_manifest_parser.py')
-rw-r--r-- | src/debputy/highlevel_manifest_parser.py | 546 |
1 files changed, 546 insertions, 0 deletions
diff --git a/src/debputy/highlevel_manifest_parser.py b/src/debputy/highlevel_manifest_parser.py new file mode 100644 index 0000000..6181603 --- /dev/null +++ b/src/debputy/highlevel_manifest_parser.py @@ -0,0 +1,546 @@ +import collections +import contextlib +from typing import ( + Optional, + Dict, + Callable, + List, + Any, + Union, + Mapping, + IO, + Iterator, + cast, + Tuple, +) + +from debian.debian_support import DpkgArchTable +from ruamel.yaml import YAMLError + +from debputy.highlevel_manifest import ( + HighLevelManifest, + PackageTransformationDefinition, + MutableYAMLManifest, + MANIFEST_YAML, +) +from debputy.maintscript_snippet import ( + MaintscriptSnippet, + STD_CONTROL_SCRIPTS, + MaintscriptSnippetContainer, +) +from debputy.packages import BinaryPackage, SourcePackage +from debputy.path_matcher import ( + MatchRuleType, + ExactFileSystemPath, + MatchRule, +) +from debputy.substitution import Substitution +from debputy.util import ( + _normalize_path, + escape_shell, + assume_not_none, +) +from debputy.util import _warn, _info +from ._deb_options_profiles import DebBuildOptionsAndProfiles +from .architecture_support import DpkgArchitectureBuildProcessValuesTable +from .filesystem_scan import FSROOverlay +from .installations import InstallRule, PPFInstallRule +from .manifest_parser.exceptions import ManifestParseException +from .manifest_parser.parser_data import ParserContextData +from .manifest_parser.util import AttributePath +from .packager_provided_files import detect_all_packager_provided_files +from .plugin.api import VirtualPath +from .plugin.api.impl_types import ( + TP, + TTP, + DispatchingTableParser, + OPARSER_PACKAGES, + OPARSER_MANIFEST_ROOT, +) +from .plugin.api.feature_set import PluginProvidedFeatureSet + +try: + from Levenshtein import distance +except ImportError: + + def _detect_possible_typo( + _d, + _key, + _attribute_parent_path: AttributePath, + required: bool, + ) -> None: + if required: + _info( + "Install python3-levenshtein to have debputy try to detect typos in the manifest." + ) + +else: + + def _detect_possible_typo( + d, + key, + _attribute_parent_path: AttributePath, + _required: bool, + ) -> None: + k_len = len(key) + for actual_key in d: + if abs(k_len - len(actual_key)) > 2: + continue + d = distance(key, actual_key) + if d > 2: + continue + path = _attribute_parent_path.path + ref = f'at "{path}"' if path else "at the manifest root level" + _warn( + f'Possible typo: The key "{actual_key}" should probably have been "{key}" {ref}' + ) + + +def _per_package_subst_variables( + p: BinaryPackage, + *, + name: Optional[str] = None, +) -> Dict[str, str]: + return { + "PACKAGE": name if name is not None else p.name, + } + + +class HighLevelManifestParser(ParserContextData): + def __init__( + self, + manifest_path: str, + source_package: SourcePackage, + binary_packages: Mapping[str, BinaryPackage], + substitution: Substitution, + dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, + dpkg_arch_query_table: DpkgArchTable, + build_env: DebBuildOptionsAndProfiles, + plugin_provided_feature_set: PluginProvidedFeatureSet, + *, + # Available for testing purposes only + debian_dir: Union[str, VirtualPath] = "./debian", + ): + self.manifest_path = manifest_path + self._source_package = source_package + self._binary_packages = binary_packages + self._mutable_yaml_manifest: Optional[MutableYAMLManifest] = None + # In source context, some variables are known to be unresolvable. Record this, so + # we can give better error messages. + self._substitution = substitution + self._dpkg_architecture_variables = dpkg_architecture_variables + self._dpkg_arch_query_table = dpkg_arch_query_table + self._build_env = build_env + self._package_state_stack: List[PackageTransformationDefinition] = [] + self._plugin_provided_feature_set = plugin_provided_feature_set + self._declared_variables = {} + + if isinstance(debian_dir, str): + debian_dir = FSROOverlay.create_root_dir("debian", debian_dir) + + self._debian_dir = debian_dir + + # Delayed initialized; we rely on this delay to parse the variables. + self._all_package_states = None + + self._install_rules: Optional[List[InstallRule]] = None + self._ownership_caches_loaded = False + self._used = False + + def _ensure_package_states_is_initialized(self) -> None: + if self._all_package_states is not None: + return + substitution = self._substitution + binary_packages = self._binary_packages + assert self._all_package_states is None + + self._all_package_states = { + n: PackageTransformationDefinition( + binary_package=p, + substitution=substitution.with_extra_substitutions( + **_per_package_subst_variables(p) + ), + is_auto_generated_package=False, + maintscript_snippets=collections.defaultdict( + MaintscriptSnippetContainer + ), + ) + for n, p in binary_packages.items() + } + for n, p in binary_packages.items(): + dbgsym_name = f"{n}-dbgsym" + if dbgsym_name in self._all_package_states: + continue + self._all_package_states[dbgsym_name] = PackageTransformationDefinition( + binary_package=p, + substitution=substitution.with_extra_substitutions( + **_per_package_subst_variables(p, name=dbgsym_name) + ), + is_auto_generated_package=True, + maintscript_snippets=collections.defaultdict( + MaintscriptSnippetContainer + ), + ) + + @property + def binary_packages(self) -> Mapping[str, BinaryPackage]: + return self._binary_packages + + @property + def _package_states(self) -> Mapping[str, PackageTransformationDefinition]: + assert self._all_package_states is not None + return self._all_package_states + + @property + def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: + return self._dpkg_architecture_variables + + @property + def dpkg_arch_query_table(self) -> DpkgArchTable: + return self._dpkg_arch_query_table + + @property + def build_env(self) -> DebBuildOptionsAndProfiles: + return self._build_env + + def build_manifest(self) -> HighLevelManifest: + if self._used: + raise TypeError("build_manifest can only be called once!") + self._used = True + self._ensure_package_states_is_initialized() + for var, attribute_path in self._declared_variables.items(): + if not self.substitution.is_used(var): + raise ManifestParseException( + f'The variable "{var}" is unused. Either use it or remove it.' + f" The variable was declared at {attribute_path.path}." + ) + if isinstance(self, YAMLManifestParser) and self._mutable_yaml_manifest is None: + self._mutable_yaml_manifest = MutableYAMLManifest.empty_manifest() + all_packager_provided_files = detect_all_packager_provided_files( + self._plugin_provided_feature_set.packager_provided_files, + self._debian_dir, + self.binary_packages, + ) + + for package in self._package_states: + with self.binary_package_context(package) as context: + if not context.is_auto_generated_package: + ppf_result = all_packager_provided_files[package] + if ppf_result.auto_installable: + context.install_rules.append( + PPFInstallRule( + context.binary_package, + context.substitution, + ppf_result.auto_installable, + ) + ) + context.reserved_packager_provided_files.update( + ppf_result.reserved_only + ) + self._transform_dpkg_maintscript_helpers_to_snippets() + + return HighLevelManifest( + self.manifest_path, + self._mutable_yaml_manifest, + self._install_rules, + self._source_package, + self.binary_packages, + self.substitution, + self._package_states, + self._dpkg_architecture_variables, + self._dpkg_arch_query_table, + self._build_env, + self._plugin_provided_feature_set, + self._debian_dir, + ) + + @contextlib.contextmanager + def binary_package_context( + self, package_name: str + ) -> Iterator[PackageTransformationDefinition]: + if package_name not in self._package_states: + self._error( + f'The package "{package_name}" is not present in the debian/control file (could not find' + f' "Package: {package_name}" in a binary stanza) nor is it a -dbgsym package for one' + " for a package in debian/control." + ) + package_state = self._package_states[package_name] + self._package_state_stack.append(package_state) + ps_len = len(self._package_state_stack) + yield package_state + if ps_len != len(self._package_state_stack): + raise RuntimeError("Internal error: Unbalanced stack manipulation detected") + self._package_state_stack.pop() + + def dispatch_parser_table_for(self, rule_type: TTP) -> DispatchingTableParser[TP]: + t = self._plugin_provided_feature_set.dispatchable_table_parsers.get(rule_type) + if t is None: + raise AssertionError( + f"Internal error: No dispatching parser for {rule_type.__name__}" + ) + return t + + @property + def substitution(self) -> Substitution: + if self._package_state_stack: + return self._package_state_stack[-1].substitution + return self._substitution + + def add_extra_substitution_variables( + self, + **extra_substitutions: Tuple[str, AttributePath], + ) -> Substitution: + if self._package_state_stack or self._all_package_states is not None: + # For one, it would not "bubble up" correctly when added to the lowest stack. + # And if it is not added to the lowest stack, then you get errors about it being + # unknown as soon as you leave the stack (which is weird for the user when + # the variable is something known, sometimes not) + raise RuntimeError("Cannot use add_extra_substitution from this state") + for key, (_, path) in extra_substitutions.items(): + self._declared_variables[key] = path + self._substitution = self._substitution.with_extra_substitutions( + **{k: v[0] for k, v in extra_substitutions.items()} + ) + return self._substitution + + @property + def current_binary_package_state(self) -> PackageTransformationDefinition: + if not self._package_state_stack: + raise RuntimeError("Invalid state: Not in a binary package context") + return self._package_state_stack[-1] + + @property + def is_in_binary_package_state(self) -> bool: + return bool(self._package_state_stack) + + def _transform_dpkg_maintscript_helpers_to_snippets(self) -> None: + package_state = self.current_binary_package_state + for dmh in package_state.dpkg_maintscript_helper_snippets: + snippet = MaintscriptSnippet( + definition_source=dmh.definition_source, + snippet=f'dpkg-maintscript-helper {escape_shell(*dmh.cmdline)} -- "$@"\n', + ) + for script in STD_CONTROL_SCRIPTS: + package_state.maintscript_snippets[script].append(snippet) + + def normalize_path( + self, + path: str, + definition_source: AttributePath, + *, + allow_root_dir_match: bool = False, + ) -> ExactFileSystemPath: + try: + normalized = _normalize_path(path) + except ValueError: + self._error( + f'The path "{path}" provided in {definition_source.path} should be relative to the root of the' + ' package and not use any ".." or "." segments.' + ) + if normalized == "." and not allow_root_dir_match: + self._error( + "Manifests must not change the root directory of the deb file. Please correct" + f' "{definition_source.path}" (path: "{path}) in {self.manifest_path}' + ) + return ExactFileSystemPath( + self.substitution.substitute(normalized, definition_source.path) + ) + + def parse_path_or_glob( + self, + path_or_glob: str, + definition_source: AttributePath, + ) -> MatchRule: + match_rule = MatchRule.from_path_or_glob( + path_or_glob, definition_source.path, substitution=self.substitution + ) + # NB: "." and "/" will be translated to MATCH_ANYTHING by MatchRule.from_path_or_glob, + # so there is no need to check for an exact match on "." like in normalize_path. + if match_rule.rule_type == MatchRuleType.MATCH_ANYTHING: + self._error( + f'The chosen match rule "{path_or_glob}" matches everything (including the deb root directory).' + f' Please correct "{definition_source.path}" (path: "{path_or_glob}) in {self.manifest_path} to' + f' something that matches "less" than everything.' + ) + return match_rule + + def parse_manifest(self) -> HighLevelManifest: + raise NotImplementedError + + +class YAMLManifestParser(HighLevelManifestParser): + def _optional_key( + self, + d: Mapping[str, Any], + key: str, + attribute_parent_path: AttributePath, + expected_type=None, + default_value=None, + ): + v = d.get(key) + if v is None: + _detect_possible_typo(d, key, attribute_parent_path, False) + return default_value + if expected_type is not None: + return self._ensure_value_is_type( + v, expected_type, key, attribute_parent_path + ) + return v + + def _required_key( + self, + d: Mapping[str, Any], + key: str, + attribute_parent_path: AttributePath, + expected_type=None, + extra: Optional[Union[str, Callable[[], str]]] = None, + ): + v = d.get(key) + if v is None: + _detect_possible_typo(d, key, attribute_parent_path, True) + if extra is not None: + msg = extra if isinstance(extra, str) else extra() + extra_info = " " + msg + else: + extra_info = "" + self._error( + f'Missing required key {key} at {attribute_parent_path.path} in manifest "{self.manifest_path}.' + f"{extra_info}" + ) + + if expected_type is not None: + return self._ensure_value_is_type( + v, expected_type, key, attribute_parent_path + ) + return v + + def _ensure_value_is_type( + self, + v, + t, + key: Union[str, int, AttributePath], + attribute_parent_path: Optional[AttributePath], + ): + if v is None: + return None + if not isinstance(v, t): + if isinstance(t, tuple): + t_msg = "one of: " + ", ".join(x.__name__ for x in t) + else: + t_msg = f"a {t.__name__}" + key_path = ( + key.path + if isinstance(key, AttributePath) + else assume_not_none(attribute_parent_path)[key].path + ) + self._error( + f'The key {key_path} must be {t_msg} in manifest "{self.manifest_path}"' + ) + return v + + def from_yaml_dict(self, yaml_data: object) -> "HighLevelManifest": + attribute_path = AttributePath.root_path() + manifest_root_parser = ( + self._plugin_provided_feature_set.dispatchable_object_parsers[ + OPARSER_MANIFEST_ROOT + ] + ) + parsed_data = cast( + "ManifestRootRule", + manifest_root_parser.parse( + yaml_data, + attribute_path, + parser_context=self, + ), + ) + + packages_dict = parsed_data.get("packages", {}) + install_rules = parsed_data.get("installations") + if install_rules: + self._install_rules = install_rules + packages_parent_path = attribute_path["packages"] + for package_name_raw, v in packages_dict.items(): + definition_source = packages_parent_path[package_name_raw] + package_name = package_name_raw + if "{{" in package_name: + package_name = self.substitution.substitute( + package_name_raw, + definition_source.path, + ) + + with self.binary_package_context(package_name) as package_state: + if package_state.is_auto_generated_package: + # Maybe lift (part) of this restriction. + self._error( + f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an' + " auto-generated package." + ) + package_rule_parser = ( + self._plugin_provided_feature_set.dispatchable_object_parsers[ + OPARSER_PACKAGES + ] + ) + parsed = cast( + "BinaryPackageRule", + package_rule_parser.parse( + v, definition_source, parser_context=self + ), + ) + binary_version = parsed.get("binary-version") + if binary_version is not None: + package_state.binary_version = ( + package_state.substitution.substitute( + binary_version, + definition_source["binary-version"].path, + ) + ) + search_dirs = parsed.get("installation_search_dirs") + if search_dirs is not None: + package_state.search_dirs = search_dirs + transformations = parsed.get("transformations") + conffile_management = parsed.get("conffile_management") + if transformations: + package_state.transformations.extend(transformations) + if conffile_management: + package_state.dpkg_maintscript_helper_snippets.extend( + conffile_management + ) + return self.build_manifest() + + def _parse_manifest(self, fd: Union[IO[bytes], str]) -> HighLevelManifest: + try: + data = MANIFEST_YAML.load(fd) + except YAMLError as e: + msg = str(e) + lines = msg.splitlines(keepends=True) + i = -1 + for i, line in enumerate(lines): + # Avoid an irrelevant "how do configure the YAML parser" message, which the + # user cannot use. + if line.startswith("To suppress this check"): + break + if i > -1 and len(lines) > i + 1: + lines = lines[:i] + msg = "".join(lines) + msg = msg.rstrip() + msg += ( + f"\n\nYou can use `yamllint -d relaxed {escape_shell(self.manifest_path)}` to validate" + " the YAML syntax. The yamllint tool also supports style rules for YAML documents" + " (such as indentation rules) in case that is of interest." + ) + raise ManifestParseException( + f"Could not parse {self.manifest_path} as a YAML document: {msg}" + ) from e + self._mutable_yaml_manifest = MutableYAMLManifest(data) + return self.from_yaml_dict(data) + + def parse_manifest( + self, + *, + fd: Optional[Union[IO[bytes], str]] = None, + ) -> HighLevelManifest: + if fd is None: + with open(self.manifest_path, "rb") as fd: + return self._parse_manifest(fd) + else: + return self._parse_manifest(fd) |