Coverage for src/debputy/substitution.py: 85%
153 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import dataclasses
2import os
3import re
4from enum import IntEnum
5from typing import FrozenSet, NoReturn, Optional, Set, Mapping, TYPE_CHECKING, Self
7from debputy.architecture_support import (
8 dpkg_architecture_table,
9 DpkgArchitectureBuildProcessValuesTable,
10)
11from debputy.exceptions import DebputySubstitutionError
12from debputy.util import glob_escape
14if TYPE_CHECKING:
15 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
16 from debputy.plugin.api import VirtualPath
19SUBST_VAR_RE = re.compile(
20 r"""
21 ([{][{][ ]*)
23 (
24 _?[A-Za-z0-9]+
25 (?:[-_:][A-Za-z0-9]+)*
26 )
28 ([ ]*[}][}])
29""",
30 re.VERBOSE,
31)
34class VariableNameState(IntEnum):
35 UNDEFINED = 1
36 RESERVED = 2
37 DEFINED = 3
40@dataclasses.dataclass(slots=True, frozen=True)
41class VariableContext:
42 debian_dir: "VirtualPath"
45class Substitution:
46 def substitute(
47 self,
48 value: str,
49 definition_source: str,
50 /,
51 escape_glob_characters: bool = False,
52 ) -> str:
53 raise NotImplementedError
55 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
56 raise NotImplementedError
58 def with_unresolvable_substitutions(
59 self, *extra_substitutions: str
60 ) -> "Substitution":
61 raise NotImplementedError
63 def variable_state(self, variable_name: str) -> VariableNameState:
64 return VariableNameState.UNDEFINED
66 def is_used(self, variable_name: str) -> bool:
67 return False
69 def _mark_used(self, variable_name: str) -> None:
70 pass
72 def _replacement(self, matched_key: str, definition_source: str) -> str:
73 self._error(
74 "Cannot resolve {{" + matched_key + "}}."
75 f" The error occurred while trying to process {definition_source}"
76 )
78 def _error(
79 self,
80 msg: str,
81 *,
82 caused_by: Optional[BaseException] = None,
83 ) -> NoReturn:
84 raise DebputySubstitutionError(msg) from caused_by
86 def _apply_substitution(
87 self,
88 pattern: re.Pattern[str],
89 value: str,
90 definition_source: str,
91 /,
92 escape_glob_characters: bool = False,
93 ) -> str:
94 replacement = value
95 offset = 0
96 for match in pattern.finditer(value):
97 prefix, matched_key, suffix = match.groups()
98 replacement_value = self._replacement(matched_key, definition_source)
99 self._mark_used(matched_key)
100 if escape_glob_characters: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true
101 replacement_value = glob_escape(replacement_value)
102 s, e = match.span()
103 s += offset
104 e += offset
105 replacement = replacement[:s] + replacement_value + replacement[e:]
106 token_fluff_len = len(prefix) + len(suffix)
107 offset += len(replacement_value) - len(matched_key) - token_fluff_len
108 return replacement
111class NullSubstitution(Substitution):
112 def substitute(
113 self,
114 value: str,
115 definition_source: str,
116 /,
117 escape_glob_characters: bool = False,
118 ) -> str:
119 return value
121 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
122 return self
124 def with_unresolvable_substitutions(
125 self, *extra_substitutions: str
126 ) -> "Substitution":
127 return self
130NULL_SUBSTITUTION = NullSubstitution()
131del NullSubstitution
134class SubstitutionImpl(Substitution):
135 __slots__ = (
136 "_used",
137 "_env",
138 "_plugin_feature_set",
139 "_static_variables",
140 "_unresolvable_substitutions",
141 "_dpkg_arch_table",
142 "_parent",
143 "_variable_context",
144 )
146 def __init__(
147 self,
148 /,
149 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None,
150 static_variables: Optional[Mapping[str, str]] = None,
151 unresolvable_substitutions: FrozenSet[str] = frozenset(),
152 dpkg_arch_table: Optional[DpkgArchitectureBuildProcessValuesTable] = None,
153 environment: Optional[Mapping[str, str]] = None,
154 parent: Optional["SubstitutionImpl"] = None,
155 variable_context: Optional[VariableContext] = None,
156 ) -> None:
157 self._used: Set[str] = set()
158 self._plugin_feature_set = plugin_feature_set
159 self._static_variables = (
160 dict(static_variables) if static_variables is not None else None
161 )
162 self._unresolvable_substitutions = unresolvable_substitutions
163 self._dpkg_arch_table = (
164 dpkg_arch_table
165 if dpkg_arch_table is not None
166 else dpkg_architecture_table()
167 )
168 self._env = environment if environment is not None else os.environ
169 self._parent = parent
170 if variable_context is not None:
171 self._variable_context = variable_context
172 elif self._parent is not None: 172 ↛ 175line 172 didn't jump to line 175, because the condition on line 172 was never false
173 self._variable_context = self._parent._variable_context
174 else:
175 raise ValueError(
176 "variable_context is required either directly or via the parent"
177 )
179 def copy_for_subst_test(
180 self,
181 plugin_feature_set: "PluginProvidedFeatureSet",
182 variable_context: VariableContext,
183 *,
184 extra_substitutions: Optional[Mapping[str, str]] = None,
185 environment: Optional[Mapping[str, str]] = None,
186 ) -> "Self":
187 extra_substitutions_impl = (
188 dict(self._static_variables.items()) if self._static_variables else {}
189 )
190 if extra_substitutions: 190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true
191 extra_substitutions_impl.update(extra_substitutions)
192 return self.__class__(
193 plugin_feature_set=plugin_feature_set,
194 variable_context=variable_context,
195 static_variables=extra_substitutions_impl,
196 unresolvable_substitutions=self._unresolvable_substitutions,
197 dpkg_arch_table=self._dpkg_arch_table,
198 environment=environment if environment is not None else {},
199 )
201 def variable_state(self, key: str) -> VariableNameState:
202 if key.startswith("DEB_"):
203 if key in self._dpkg_arch_table:
204 return VariableNameState.DEFINED
205 return VariableNameState.RESERVED
206 plugin_feature_set = self._plugin_feature_set
207 if (
208 plugin_feature_set is not None
209 and key in plugin_feature_set.manifest_variables
210 ):
211 return VariableNameState.DEFINED
212 if key.startswith("env:"):
213 k = key[4:]
214 if k in self._env: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true
215 return VariableNameState.DEFINED
216 return VariableNameState.RESERVED
217 if self._static_variables is not None and key in self._static_variables: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true
218 return VariableNameState.DEFINED
219 if key in self._unresolvable_substitutions:
220 return VariableNameState.RESERVED
221 if self._parent is not None:
222 return self._parent.variable_state(key)
223 return VariableNameState.UNDEFINED
225 def is_used(self, variable_name: str) -> bool:
226 if variable_name in self._used:
227 return True
228 parent = self._parent
229 if parent is not None:
230 return parent.is_used(variable_name)
231 return False
233 def _mark_used(self, variable_name: str) -> None:
234 p = self._parent
235 while p:
236 # Find the parent that has the variable if possible. This ensures that is_used works
237 # correctly.
238 if p._static_variables is not None and variable_name in p._static_variables:
239 p._mark_used(variable_name)
240 break
241 plugin_feature_set = p._plugin_feature_set
242 if ( 242 ↛ 249line 242 didn't jump to line 249
243 plugin_feature_set is not None
244 and variable_name in plugin_feature_set.manifest_variables
245 and not plugin_feature_set.manifest_variables[
246 variable_name
247 ].is_documentation_placeholder
248 ):
249 p._mark_used(variable_name)
250 break
251 p = p._parent
252 self._used.add(variable_name)
254 def _replacement(self, key: str, definition_source: str) -> str:
255 if key.startswith("DEB_") and key in self._dpkg_arch_table:
256 return self._dpkg_arch_table[key]
257 if key.startswith("env:"): 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true
258 k = key[4:]
259 if k in self._env:
260 return self._env[k]
261 self._error(
262 f'The environment does not contain the variable "{key}" '
263 f"(error occurred while trying to process {definition_source})"
264 )
266 # The order between extra_substitution and plugin_feature_set is leveraged by
267 # the tests to implement mocking variables. If the order needs tweaking,
268 # you will need a custom resolver for the tests to support mocking.
269 static_variables = self._static_variables
270 if static_variables and key in static_variables:
271 return static_variables[key]
272 plugin_feature_set = self._plugin_feature_set
273 if plugin_feature_set is not None:
274 provided_var = plugin_feature_set.manifest_variables.get(key)
275 if (
276 provided_var is not None
277 and not provided_var.is_documentation_placeholder
278 ):
279 v = provided_var.resolve(self._variable_context)
280 # cache it for next time.
281 if static_variables is None:
282 static_variables = {}
283 self._static_variables = static_variables
284 static_variables[key] = v
285 return v
286 if key in self._unresolvable_substitutions:
287 self._error(
288 "The variable {{" + key + "}}"
289 f" is not available while processing {definition_source}."
290 )
291 parent = self._parent
292 if parent is not None:
293 return parent._replacement(key, definition_source)
294 self._error(
295 "Cannot resolve {{" + key + "}}: it is not a known key."
296 f" The error occurred while trying to process {definition_source}"
297 )
299 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
300 if not extra_substitutions: 300 ↛ 301line 300 didn't jump to line 301, because the condition on line 300 was never true
301 return self
302 return SubstitutionImpl(
303 dpkg_arch_table=self._dpkg_arch_table,
304 environment=self._env,
305 static_variables=extra_substitutions,
306 parent=self,
307 )
309 def with_unresolvable_substitutions(
310 self,
311 *extra_substitutions: str,
312 ) -> "Substitution":
313 if not extra_substitutions:
314 return self
315 return SubstitutionImpl(
316 dpkg_arch_table=self._dpkg_arch_table,
317 environment=self._env,
318 unresolvable_substitutions=frozenset(extra_substitutions),
319 parent=self,
320 )
322 def substitute(
323 self,
324 value: str,
325 definition_source: str,
326 /,
327 escape_glob_characters: bool = False,
328 ) -> str:
329 if "{{" not in value:
330 return value
331 return self._apply_substitution(
332 SUBST_VAR_RE,
333 value,
334 definition_source,
335 escape_glob_characters=escape_glob_characters,
336 )