summaryrefslogtreecommitdiffstats
path: root/src/debputy/substitution.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/substitution.py')
-rw-r--r--src/debputy/substitution.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/debputy/substitution.py b/src/debputy/substitution.py
new file mode 100644
index 0000000..0923d8f
--- /dev/null
+++ b/src/debputy/substitution.py
@@ -0,0 +1,336 @@
+import dataclasses
+import os
+import re
+from enum import IntEnum
+from typing import FrozenSet, NoReturn, Optional, Set, Mapping, TYPE_CHECKING, Self
+
+from debputy.architecture_support import (
+ dpkg_architecture_table,
+ DpkgArchitectureBuildProcessValuesTable,
+)
+from debputy.exceptions import DebputySubstitutionError
+from debputy.util import glob_escape
+
+if TYPE_CHECKING:
+ from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
+ from debputy.plugin.api import VirtualPath
+
+
+SUBST_VAR_RE = re.compile(
+ r"""
+ ([{][{][ ]*)
+
+ (
+ _?[A-Za-z0-9]+
+ (?:[-_:][A-Za-z0-9]+)*
+ )
+
+ ([ ]*[}][}])
+""",
+ re.VERBOSE,
+)
+
+
+class VariableNameState(IntEnum):
+ UNDEFINED = 1
+ RESERVED = 2
+ DEFINED = 3
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class VariableContext:
+ debian_dir: "VirtualPath"
+
+
+class Substitution:
+ def substitute(
+ self,
+ value: str,
+ definition_source: str,
+ /,
+ escape_glob_characters: bool = False,
+ ) -> str:
+ raise NotImplementedError
+
+ def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
+ raise NotImplementedError
+
+ def with_unresolvable_substitutions(
+ self, *extra_substitutions: str
+ ) -> "Substitution":
+ raise NotImplementedError
+
+ def variable_state(self, variable_name: str) -> VariableNameState:
+ return VariableNameState.UNDEFINED
+
+ def is_used(self, variable_name: str) -> bool:
+ return False
+
+ def _mark_used(self, variable_name: str) -> None:
+ pass
+
+ def _replacement(self, matched_key: str, definition_source: str) -> str:
+ self._error(
+ "Cannot resolve {{" + matched_key + "}}."
+ f" The error occurred while trying to process {definition_source}"
+ )
+
+ def _error(
+ self,
+ msg: str,
+ *,
+ caused_by: Optional[BaseException] = None,
+ ) -> NoReturn:
+ raise DebputySubstitutionError(msg) from caused_by
+
+ def _apply_substitution(
+ self,
+ pattern: re.Pattern[str],
+ value: str,
+ definition_source: str,
+ /,
+ escape_glob_characters: bool = False,
+ ) -> str:
+ replacement = value
+ offset = 0
+ for match in pattern.finditer(value):
+ prefix, matched_key, suffix = match.groups()
+ replacement_value = self._replacement(matched_key, definition_source)
+ self._mark_used(matched_key)
+ if escape_glob_characters:
+ replacement_value = glob_escape(replacement_value)
+ s, e = match.span()
+ s += offset
+ e += offset
+ replacement = replacement[:s] + replacement_value + replacement[e:]
+ token_fluff_len = len(prefix) + len(suffix)
+ offset += len(replacement_value) - len(matched_key) - token_fluff_len
+ return replacement
+
+
+class NullSubstitution(Substitution):
+ def substitute(
+ self,
+ value: str,
+ definition_source: str,
+ /,
+ escape_glob_characters: bool = False,
+ ) -> str:
+ return value
+
+ def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
+ return self
+
+ def with_unresolvable_substitutions(
+ self, *extra_substitutions: str
+ ) -> "Substitution":
+ return self
+
+
+NULL_SUBSTITUTION = NullSubstitution()
+del NullSubstitution
+
+
+class SubstitutionImpl(Substitution):
+ __slots__ = (
+ "_used",
+ "_env",
+ "_plugin_feature_set",
+ "_static_variables",
+ "_unresolvable_substitutions",
+ "_dpkg_arch_table",
+ "_parent",
+ "_variable_context",
+ )
+
+ def __init__(
+ self,
+ /,
+ plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None,
+ static_variables: Optional[Mapping[str, str]] = None,
+ unresolvable_substitutions: FrozenSet[str] = frozenset(),
+ dpkg_arch_table: Optional[DpkgArchitectureBuildProcessValuesTable] = None,
+ environment: Optional[Mapping[str, str]] = None,
+ parent: Optional["SubstitutionImpl"] = None,
+ variable_context: Optional[VariableContext] = None,
+ ) -> None:
+ self._used: Set[str] = set()
+ self._plugin_feature_set = plugin_feature_set
+ self._static_variables = (
+ dict(static_variables) if static_variables is not None else None
+ )
+ self._unresolvable_substitutions = unresolvable_substitutions
+ self._dpkg_arch_table = (
+ dpkg_arch_table
+ if dpkg_arch_table is not None
+ else dpkg_architecture_table()
+ )
+ self._env = environment if environment is not None else os.environ
+ self._parent = parent
+ if variable_context is not None:
+ self._variable_context = variable_context
+ elif self._parent is not None:
+ self._variable_context = self._parent._variable_context
+ else:
+ raise ValueError(
+ "variable_context is required either directly or via the parent"
+ )
+
+ def copy_for_subst_test(
+ self,
+ plugin_feature_set: "PluginProvidedFeatureSet",
+ variable_context: VariableContext,
+ *,
+ extra_substitutions: Optional[Mapping[str, str]] = None,
+ environment: Optional[Mapping[str, str]] = None,
+ ) -> "Self":
+ extra_substitutions_impl = (
+ dict(self._static_variables.items()) if self._static_variables else {}
+ )
+ if extra_substitutions:
+ extra_substitutions_impl.update(extra_substitutions)
+ return self.__class__(
+ plugin_feature_set=plugin_feature_set,
+ variable_context=variable_context,
+ static_variables=extra_substitutions_impl,
+ unresolvable_substitutions=self._unresolvable_substitutions,
+ dpkg_arch_table=self._dpkg_arch_table,
+ environment=environment if environment is not None else {},
+ )
+
+ def variable_state(self, key: str) -> VariableNameState:
+ if key.startswith("DEB_"):
+ if key in self._dpkg_arch_table:
+ return VariableNameState.DEFINED
+ return VariableNameState.RESERVED
+ plugin_feature_set = self._plugin_feature_set
+ if (
+ plugin_feature_set is not None
+ and key in plugin_feature_set.manifest_variables
+ ):
+ return VariableNameState.DEFINED
+ if key.startswith("env:"):
+ k = key[4:]
+ if k in self._env:
+ return VariableNameState.DEFINED
+ return VariableNameState.RESERVED
+ if self._static_variables is not None and key in self._static_variables:
+ return VariableNameState.DEFINED
+ if key in self._unresolvable_substitutions:
+ return VariableNameState.RESERVED
+ if self._parent is not None:
+ return self._parent.variable_state(key)
+ return VariableNameState.UNDEFINED
+
+ def is_used(self, variable_name: str) -> bool:
+ if variable_name in self._used:
+ return True
+ parent = self._parent
+ if parent is not None:
+ return parent.is_used(variable_name)
+ return False
+
+ def _mark_used(self, variable_name: str) -> None:
+ p = self._parent
+ while p:
+ # Find the parent that has the variable if possible. This ensures that is_used works
+ # correctly.
+ if p._static_variables is not None and variable_name in p._static_variables:
+ p._mark_used(variable_name)
+ break
+ plugin_feature_set = p._plugin_feature_set
+ if (
+ plugin_feature_set is not None
+ and variable_name in plugin_feature_set.manifest_variables
+ and not plugin_feature_set.manifest_variables[
+ variable_name
+ ].is_documentation_placeholder
+ ):
+ p._mark_used(variable_name)
+ break
+ p = p._parent
+ self._used.add(variable_name)
+
+ def _replacement(self, key: str, definition_source: str) -> str:
+ if key.startswith("DEB_") and key in self._dpkg_arch_table:
+ return self._dpkg_arch_table[key]
+ if key.startswith("env:"):
+ k = key[4:]
+ if k in self._env:
+ return self._env[k]
+ self._error(
+ f'The environment does not contain the variable "{key}" '
+ f"(error occurred while trying to process {definition_source})"
+ )
+
+ # The order between extra_substitution and plugin_feature_set is leveraged by
+ # the tests to implement mocking variables. If the order needs tweaking,
+ # you will need a custom resolver for the tests to support mocking.
+ static_variables = self._static_variables
+ if static_variables and key in static_variables:
+ return static_variables[key]
+ plugin_feature_set = self._plugin_feature_set
+ if plugin_feature_set is not None:
+ provided_var = plugin_feature_set.manifest_variables.get(key)
+ if (
+ provided_var is not None
+ and not provided_var.is_documentation_placeholder
+ ):
+ v = provided_var.resolve(self._variable_context)
+ # cache it for next time.
+ if static_variables is None:
+ static_variables = {}
+ self._static_variables = static_variables
+ static_variables[key] = v
+ return v
+ if key in self._unresolvable_substitutions:
+ self._error(
+ "The variable {{" + key + "}}"
+ f" is not available while processing {definition_source}."
+ )
+ parent = self._parent
+ if parent is not None:
+ return parent._replacement(key, definition_source)
+ self._error(
+ "Cannot resolve {{" + key + "}}: it is not a known key."
+ f" The error occurred while trying to process {definition_source}"
+ )
+
+ def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
+ if not extra_substitutions:
+ return self
+ return SubstitutionImpl(
+ dpkg_arch_table=self._dpkg_arch_table,
+ environment=self._env,
+ static_variables=extra_substitutions,
+ parent=self,
+ )
+
+ def with_unresolvable_substitutions(
+ self,
+ *extra_substitutions: str,
+ ) -> "Substitution":
+ if not extra_substitutions:
+ return self
+ return SubstitutionImpl(
+ dpkg_arch_table=self._dpkg_arch_table,
+ environment=self._env,
+ unresolvable_substitutions=frozenset(extra_substitutions),
+ parent=self,
+ )
+
+ def substitute(
+ self,
+ value: str,
+ definition_source: str,
+ /,
+ escape_glob_characters: bool = False,
+ ) -> str:
+ if "{{" not in value:
+ return value
+ return self._apply_substitution(
+ SUBST_VAR_RE,
+ value,
+ definition_source,
+ escape_glob_characters=escape_glob_characters,
+ )