summaryrefslogtreecommitdiffstats
path: root/src/debputy/highlevel_manifest_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/highlevel_manifest_parser.py')
-rw-r--r--src/debputy/highlevel_manifest_parser.py546
1 files changed, 546 insertions, 0 deletions
diff --git a/src/debputy/highlevel_manifest_parser.py b/src/debputy/highlevel_manifest_parser.py
new file mode 100644
index 0000000..6181603
--- /dev/null
+++ b/src/debputy/highlevel_manifest_parser.py
@@ -0,0 +1,546 @@
+import collections
+import contextlib
+from typing import (
+ Optional,
+ Dict,
+ Callable,
+ List,
+ Any,
+ Union,
+ Mapping,
+ IO,
+ Iterator,
+ cast,
+ Tuple,
+)
+
+from debian.debian_support import DpkgArchTable
+from ruamel.yaml import YAMLError
+
+from debputy.highlevel_manifest import (
+ HighLevelManifest,
+ PackageTransformationDefinition,
+ MutableYAMLManifest,
+ MANIFEST_YAML,
+)
+from debputy.maintscript_snippet import (
+ MaintscriptSnippet,
+ STD_CONTROL_SCRIPTS,
+ MaintscriptSnippetContainer,
+)
+from debputy.packages import BinaryPackage, SourcePackage
+from debputy.path_matcher import (
+ MatchRuleType,
+ ExactFileSystemPath,
+ MatchRule,
+)
+from debputy.substitution import Substitution
+from debputy.util import (
+ _normalize_path,
+ escape_shell,
+ assume_not_none,
+)
+from debputy.util import _warn, _info
+from ._deb_options_profiles import DebBuildOptionsAndProfiles
+from .architecture_support import DpkgArchitectureBuildProcessValuesTable
+from .filesystem_scan import FSROOverlay
+from .installations import InstallRule, PPFInstallRule
+from .manifest_parser.exceptions import ManifestParseException
+from .manifest_parser.parser_data import ParserContextData
+from .manifest_parser.util import AttributePath
+from .packager_provided_files import detect_all_packager_provided_files
+from .plugin.api import VirtualPath
+from .plugin.api.impl_types import (
+ TP,
+ TTP,
+ DispatchingTableParser,
+ OPARSER_PACKAGES,
+ OPARSER_MANIFEST_ROOT,
+)
+from .plugin.api.feature_set import PluginProvidedFeatureSet
+
+try:
+ from Levenshtein import distance
+except ImportError:
+
+ def _detect_possible_typo(
+ _d,
+ _key,
+ _attribute_parent_path: AttributePath,
+ required: bool,
+ ) -> None:
+ if required:
+ _info(
+ "Install python3-levenshtein to have debputy try to detect typos in the manifest."
+ )
+
+else:
+
+ def _detect_possible_typo(
+ d,
+ key,
+ _attribute_parent_path: AttributePath,
+ _required: bool,
+ ) -> None:
+ k_len = len(key)
+ for actual_key in d:
+ if abs(k_len - len(actual_key)) > 2:
+ continue
+ d = distance(key, actual_key)
+ if d > 2:
+ continue
+ path = _attribute_parent_path.path
+ ref = f'at "{path}"' if path else "at the manifest root level"
+ _warn(
+ f'Possible typo: The key "{actual_key}" should probably have been "{key}" {ref}'
+ )
+
+
+def _per_package_subst_variables(
+ p: BinaryPackage,
+ *,
+ name: Optional[str] = None,
+) -> Dict[str, str]:
+ return {
+ "PACKAGE": name if name is not None else p.name,
+ }
+
+
+class HighLevelManifestParser(ParserContextData):
+ def __init__(
+ self,
+ manifest_path: str,
+ source_package: SourcePackage,
+ binary_packages: Mapping[str, BinaryPackage],
+ substitution: Substitution,
+ dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
+ dpkg_arch_query_table: DpkgArchTable,
+ build_env: DebBuildOptionsAndProfiles,
+ plugin_provided_feature_set: PluginProvidedFeatureSet,
+ *,
+ # Available for testing purposes only
+ debian_dir: Union[str, VirtualPath] = "./debian",
+ ):
+ self.manifest_path = manifest_path
+ self._source_package = source_package
+ self._binary_packages = binary_packages
+ self._mutable_yaml_manifest: Optional[MutableYAMLManifest] = None
+ # In source context, some variables are known to be unresolvable. Record this, so
+ # we can give better error messages.
+ self._substitution = substitution
+ self._dpkg_architecture_variables = dpkg_architecture_variables
+ self._dpkg_arch_query_table = dpkg_arch_query_table
+ self._build_env = build_env
+ self._package_state_stack: List[PackageTransformationDefinition] = []
+ self._plugin_provided_feature_set = plugin_provided_feature_set
+ self._declared_variables = {}
+
+ if isinstance(debian_dir, str):
+ debian_dir = FSROOverlay.create_root_dir("debian", debian_dir)
+
+ self._debian_dir = debian_dir
+
+ # Delayed initialized; we rely on this delay to parse the variables.
+ self._all_package_states = None
+
+ self._install_rules: Optional[List[InstallRule]] = None
+ self._ownership_caches_loaded = False
+ self._used = False
+
+ def _ensure_package_states_is_initialized(self) -> None:
+ if self._all_package_states is not None:
+ return
+ substitution = self._substitution
+ binary_packages = self._binary_packages
+ assert self._all_package_states is None
+
+ self._all_package_states = {
+ n: PackageTransformationDefinition(
+ binary_package=p,
+ substitution=substitution.with_extra_substitutions(
+ **_per_package_subst_variables(p)
+ ),
+ is_auto_generated_package=False,
+ maintscript_snippets=collections.defaultdict(
+ MaintscriptSnippetContainer
+ ),
+ )
+ for n, p in binary_packages.items()
+ }
+ for n, p in binary_packages.items():
+ dbgsym_name = f"{n}-dbgsym"
+ if dbgsym_name in self._all_package_states:
+ continue
+ self._all_package_states[dbgsym_name] = PackageTransformationDefinition(
+ binary_package=p,
+ substitution=substitution.with_extra_substitutions(
+ **_per_package_subst_variables(p, name=dbgsym_name)
+ ),
+ is_auto_generated_package=True,
+ maintscript_snippets=collections.defaultdict(
+ MaintscriptSnippetContainer
+ ),
+ )
+
+ @property
+ def binary_packages(self) -> Mapping[str, BinaryPackage]:
+ return self._binary_packages
+
+ @property
+ def _package_states(self) -> Mapping[str, PackageTransformationDefinition]:
+ assert self._all_package_states is not None
+ return self._all_package_states
+
+ @property
+ def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
+ return self._dpkg_architecture_variables
+
+ @property
+ def dpkg_arch_query_table(self) -> DpkgArchTable:
+ return self._dpkg_arch_query_table
+
+ @property
+ def build_env(self) -> DebBuildOptionsAndProfiles:
+ return self._build_env
+
+ def build_manifest(self) -> HighLevelManifest:
+ if self._used:
+ raise TypeError("build_manifest can only be called once!")
+ self._used = True
+ self._ensure_package_states_is_initialized()
+ for var, attribute_path in self._declared_variables.items():
+ if not self.substitution.is_used(var):
+ raise ManifestParseException(
+ f'The variable "{var}" is unused. Either use it or remove it.'
+ f" The variable was declared at {attribute_path.path}."
+ )
+ if isinstance(self, YAMLManifestParser) and self._mutable_yaml_manifest is None:
+ self._mutable_yaml_manifest = MutableYAMLManifest.empty_manifest()
+ all_packager_provided_files = detect_all_packager_provided_files(
+ self._plugin_provided_feature_set.packager_provided_files,
+ self._debian_dir,
+ self.binary_packages,
+ )
+
+ for package in self._package_states:
+ with self.binary_package_context(package) as context:
+ if not context.is_auto_generated_package:
+ ppf_result = all_packager_provided_files[package]
+ if ppf_result.auto_installable:
+ context.install_rules.append(
+ PPFInstallRule(
+ context.binary_package,
+ context.substitution,
+ ppf_result.auto_installable,
+ )
+ )
+ context.reserved_packager_provided_files.update(
+ ppf_result.reserved_only
+ )
+ self._transform_dpkg_maintscript_helpers_to_snippets()
+
+ return HighLevelManifest(
+ self.manifest_path,
+ self._mutable_yaml_manifest,
+ self._install_rules,
+ self._source_package,
+ self.binary_packages,
+ self.substitution,
+ self._package_states,
+ self._dpkg_architecture_variables,
+ self._dpkg_arch_query_table,
+ self._build_env,
+ self._plugin_provided_feature_set,
+ self._debian_dir,
+ )
+
+ @contextlib.contextmanager
+ def binary_package_context(
+ self, package_name: str
+ ) -> Iterator[PackageTransformationDefinition]:
+ if package_name not in self._package_states:
+ self._error(
+ f'The package "{package_name}" is not present in the debian/control file (could not find'
+ f' "Package: {package_name}" in a binary stanza) nor is it a -dbgsym package for one'
+ " for a package in debian/control."
+ )
+ package_state = self._package_states[package_name]
+ self._package_state_stack.append(package_state)
+ ps_len = len(self._package_state_stack)
+ yield package_state
+ if ps_len != len(self._package_state_stack):
+ raise RuntimeError("Internal error: Unbalanced stack manipulation detected")
+ self._package_state_stack.pop()
+
+ def dispatch_parser_table_for(self, rule_type: TTP) -> DispatchingTableParser[TP]:
+ t = self._plugin_provided_feature_set.dispatchable_table_parsers.get(rule_type)
+ if t is None:
+ raise AssertionError(
+ f"Internal error: No dispatching parser for {rule_type.__name__}"
+ )
+ return t
+
+ @property
+ def substitution(self) -> Substitution:
+ if self._package_state_stack:
+ return self._package_state_stack[-1].substitution
+ return self._substitution
+
+ def add_extra_substitution_variables(
+ self,
+ **extra_substitutions: Tuple[str, AttributePath],
+ ) -> Substitution:
+ if self._package_state_stack or self._all_package_states is not None:
+ # For one, it would not "bubble up" correctly when added to the lowest stack.
+ # And if it is not added to the lowest stack, then you get errors about it being
+ # unknown as soon as you leave the stack (which is weird for the user when
+ # the variable is something known, sometimes not)
+ raise RuntimeError("Cannot use add_extra_substitution from this state")
+ for key, (_, path) in extra_substitutions.items():
+ self._declared_variables[key] = path
+ self._substitution = self._substitution.with_extra_substitutions(
+ **{k: v[0] for k, v in extra_substitutions.items()}
+ )
+ return self._substitution
+
+ @property
+ def current_binary_package_state(self) -> PackageTransformationDefinition:
+ if not self._package_state_stack:
+ raise RuntimeError("Invalid state: Not in a binary package context")
+ return self._package_state_stack[-1]
+
+ @property
+ def is_in_binary_package_state(self) -> bool:
+ return bool(self._package_state_stack)
+
+ def _transform_dpkg_maintscript_helpers_to_snippets(self) -> None:
+ package_state = self.current_binary_package_state
+ for dmh in package_state.dpkg_maintscript_helper_snippets:
+ snippet = MaintscriptSnippet(
+ definition_source=dmh.definition_source,
+ snippet=f'dpkg-maintscript-helper {escape_shell(*dmh.cmdline)} -- "$@"\n',
+ )
+ for script in STD_CONTROL_SCRIPTS:
+ package_state.maintscript_snippets[script].append(snippet)
+
+ def normalize_path(
+ self,
+ path: str,
+ definition_source: AttributePath,
+ *,
+ allow_root_dir_match: bool = False,
+ ) -> ExactFileSystemPath:
+ try:
+ normalized = _normalize_path(path)
+ except ValueError:
+ self._error(
+ f'The path "{path}" provided in {definition_source.path} should be relative to the root of the'
+ ' package and not use any ".." or "." segments.'
+ )
+ if normalized == "." and not allow_root_dir_match:
+ self._error(
+ "Manifests must not change the root directory of the deb file. Please correct"
+ f' "{definition_source.path}" (path: "{path}) in {self.manifest_path}'
+ )
+ return ExactFileSystemPath(
+ self.substitution.substitute(normalized, definition_source.path)
+ )
+
+ def parse_path_or_glob(
+ self,
+ path_or_glob: str,
+ definition_source: AttributePath,
+ ) -> MatchRule:
+ match_rule = MatchRule.from_path_or_glob(
+ path_or_glob, definition_source.path, substitution=self.substitution
+ )
+ # NB: "." and "/" will be translated to MATCH_ANYTHING by MatchRule.from_path_or_glob,
+ # so there is no need to check for an exact match on "." like in normalize_path.
+ if match_rule.rule_type == MatchRuleType.MATCH_ANYTHING:
+ self._error(
+ f'The chosen match rule "{path_or_glob}" matches everything (including the deb root directory).'
+ f' Please correct "{definition_source.path}" (path: "{path_or_glob}) in {self.manifest_path} to'
+ f' something that matches "less" than everything.'
+ )
+ return match_rule
+
+ def parse_manifest(self) -> HighLevelManifest:
+ raise NotImplementedError
+
+
+class YAMLManifestParser(HighLevelManifestParser):
+ def _optional_key(
+ self,
+ d: Mapping[str, Any],
+ key: str,
+ attribute_parent_path: AttributePath,
+ expected_type=None,
+ default_value=None,
+ ):
+ v = d.get(key)
+ if v is None:
+ _detect_possible_typo(d, key, attribute_parent_path, False)
+ return default_value
+ if expected_type is not None:
+ return self._ensure_value_is_type(
+ v, expected_type, key, attribute_parent_path
+ )
+ return v
+
+ def _required_key(
+ self,
+ d: Mapping[str, Any],
+ key: str,
+ attribute_parent_path: AttributePath,
+ expected_type=None,
+ extra: Optional[Union[str, Callable[[], str]]] = None,
+ ):
+ v = d.get(key)
+ if v is None:
+ _detect_possible_typo(d, key, attribute_parent_path, True)
+ if extra is not None:
+ msg = extra if isinstance(extra, str) else extra()
+ extra_info = " " + msg
+ else:
+ extra_info = ""
+ self._error(
+ f'Missing required key {key} at {attribute_parent_path.path} in manifest "{self.manifest_path}.'
+ f"{extra_info}"
+ )
+
+ if expected_type is not None:
+ return self._ensure_value_is_type(
+ v, expected_type, key, attribute_parent_path
+ )
+ return v
+
+ def _ensure_value_is_type(
+ self,
+ v,
+ t,
+ key: Union[str, int, AttributePath],
+ attribute_parent_path: Optional[AttributePath],
+ ):
+ if v is None:
+ return None
+ if not isinstance(v, t):
+ if isinstance(t, tuple):
+ t_msg = "one of: " + ", ".join(x.__name__ for x in t)
+ else:
+ t_msg = f"a {t.__name__}"
+ key_path = (
+ key.path
+ if isinstance(key, AttributePath)
+ else assume_not_none(attribute_parent_path)[key].path
+ )
+ self._error(
+ f'The key {key_path} must be {t_msg} in manifest "{self.manifest_path}"'
+ )
+ return v
+
+ def from_yaml_dict(self, yaml_data: object) -> "HighLevelManifest":
+ attribute_path = AttributePath.root_path()
+ manifest_root_parser = (
+ self._plugin_provided_feature_set.dispatchable_object_parsers[
+ OPARSER_MANIFEST_ROOT
+ ]
+ )
+ parsed_data = cast(
+ "ManifestRootRule",
+ manifest_root_parser.parse(
+ yaml_data,
+ attribute_path,
+ parser_context=self,
+ ),
+ )
+
+ packages_dict = parsed_data.get("packages", {})
+ install_rules = parsed_data.get("installations")
+ if install_rules:
+ self._install_rules = install_rules
+ packages_parent_path = attribute_path["packages"]
+ for package_name_raw, v in packages_dict.items():
+ definition_source = packages_parent_path[package_name_raw]
+ package_name = package_name_raw
+ if "{{" in package_name:
+ package_name = self.substitution.substitute(
+ package_name_raw,
+ definition_source.path,
+ )
+
+ with self.binary_package_context(package_name) as package_state:
+ if package_state.is_auto_generated_package:
+ # Maybe lift (part) of this restriction.
+ self._error(
+ f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
+ " auto-generated package."
+ )
+ package_rule_parser = (
+ self._plugin_provided_feature_set.dispatchable_object_parsers[
+ OPARSER_PACKAGES
+ ]
+ )
+ parsed = cast(
+ "BinaryPackageRule",
+ package_rule_parser.parse(
+ v, definition_source, parser_context=self
+ ),
+ )
+ binary_version = parsed.get("binary-version")
+ if binary_version is not None:
+ package_state.binary_version = (
+ package_state.substitution.substitute(
+ binary_version,
+ definition_source["binary-version"].path,
+ )
+ )
+ search_dirs = parsed.get("installation_search_dirs")
+ if search_dirs is not None:
+ package_state.search_dirs = search_dirs
+ transformations = parsed.get("transformations")
+ conffile_management = parsed.get("conffile_management")
+ if transformations:
+ package_state.transformations.extend(transformations)
+ if conffile_management:
+ package_state.dpkg_maintscript_helper_snippets.extend(
+ conffile_management
+ )
+ return self.build_manifest()
+
+ def _parse_manifest(self, fd: Union[IO[bytes], str]) -> HighLevelManifest:
+ try:
+ data = MANIFEST_YAML.load(fd)
+ except YAMLError as e:
+ msg = str(e)
+ lines = msg.splitlines(keepends=True)
+ i = -1
+ for i, line in enumerate(lines):
+ # Avoid an irrelevant "how do configure the YAML parser" message, which the
+ # user cannot use.
+ if line.startswith("To suppress this check"):
+ break
+ if i > -1 and len(lines) > i + 1:
+ lines = lines[:i]
+ msg = "".join(lines)
+ msg = msg.rstrip()
+ msg += (
+ f"\n\nYou can use `yamllint -d relaxed {escape_shell(self.manifest_path)}` to validate"
+ " the YAML syntax. The yamllint tool also supports style rules for YAML documents"
+ " (such as indentation rules) in case that is of interest."
+ )
+ raise ManifestParseException(
+ f"Could not parse {self.manifest_path} as a YAML document: {msg}"
+ ) from e
+ self._mutable_yaml_manifest = MutableYAMLManifest(data)
+ return self.from_yaml_dict(data)
+
+ def parse_manifest(
+ self,
+ *,
+ fd: Optional[Union[IO[bytes], str]] = None,
+ ) -> HighLevelManifest:
+ if fd is None:
+ with open(self.manifest_path, "rb") as fd:
+ return self._parse_manifest(fd)
+ else:
+ return self._parse_manifest(fd)