Coverage for src/debputy/substitution.py: 85%

153 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import dataclasses 

2import os 

3import re 

4from enum import IntEnum 

5from typing import FrozenSet, NoReturn, Optional, Set, Mapping, TYPE_CHECKING, Self 

6 

7from debputy.architecture_support import ( 

8 dpkg_architecture_table, 

9 DpkgArchitectureBuildProcessValuesTable, 

10) 

11from debputy.exceptions import DebputySubstitutionError 

12from debputy.util import glob_escape 

13 

14if TYPE_CHECKING: 

15 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

16 from debputy.plugin.api import VirtualPath 

17 

18 

19SUBST_VAR_RE = re.compile( 

20 r""" 

21 ([{][{][ ]*) 

22 

23 ( 

24 _?[A-Za-z0-9]+ 

25 (?:[-_:][A-Za-z0-9]+)* 

26 ) 

27 

28 ([ ]*[}][}]) 

29""", 

30 re.VERBOSE, 

31) 

32 

33 

34class VariableNameState(IntEnum): 

35 UNDEFINED = 1 

36 RESERVED = 2 

37 DEFINED = 3 

38 

39 

40@dataclasses.dataclass(slots=True, frozen=True) 

41class VariableContext: 

42 debian_dir: "VirtualPath" 

43 

44 

45class Substitution: 

46 def substitute( 

47 self, 

48 value: str, 

49 definition_source: str, 

50 /, 

51 escape_glob_characters: bool = False, 

52 ) -> str: 

53 raise NotImplementedError 

54 

55 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

56 raise NotImplementedError 

57 

58 def with_unresolvable_substitutions( 

59 self, *extra_substitutions: str 

60 ) -> "Substitution": 

61 raise NotImplementedError 

62 

63 def variable_state(self, variable_name: str) -> VariableNameState: 

64 return VariableNameState.UNDEFINED 

65 

66 def is_used(self, variable_name: str) -> bool: 

67 return False 

68 

69 def _mark_used(self, variable_name: str) -> None: 

70 pass 

71 

72 def _replacement(self, matched_key: str, definition_source: str) -> str: 

73 self._error( 

74 "Cannot resolve {{" + matched_key + "}}." 

75 f" The error occurred while trying to process {definition_source}" 

76 ) 

77 

78 def _error( 

79 self, 

80 msg: str, 

81 *, 

82 caused_by: Optional[BaseException] = None, 

83 ) -> NoReturn: 

84 raise DebputySubstitutionError(msg) from caused_by 

85 

86 def _apply_substitution( 

87 self, 

88 pattern: re.Pattern[str], 

89 value: str, 

90 definition_source: str, 

91 /, 

92 escape_glob_characters: bool = False, 

93 ) -> str: 

94 replacement = value 

95 offset = 0 

96 for match in pattern.finditer(value): 

97 prefix, matched_key, suffix = match.groups() 

98 replacement_value = self._replacement(matched_key, definition_source) 

99 self._mark_used(matched_key) 

100 if escape_glob_characters: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true

101 replacement_value = glob_escape(replacement_value) 

102 s, e = match.span() 

103 s += offset 

104 e += offset 

105 replacement = replacement[:s] + replacement_value + replacement[e:] 

106 token_fluff_len = len(prefix) + len(suffix) 

107 offset += len(replacement_value) - len(matched_key) - token_fluff_len 

108 return replacement 

109 

110 

111class NullSubstitution(Substitution): 

112 def substitute( 

113 self, 

114 value: str, 

115 definition_source: str, 

116 /, 

117 escape_glob_characters: bool = False, 

118 ) -> str: 

119 return value 

120 

121 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

122 return self 

123 

124 def with_unresolvable_substitutions( 

125 self, *extra_substitutions: str 

126 ) -> "Substitution": 

127 return self 

128 

129 

130NULL_SUBSTITUTION = NullSubstitution() 

131del NullSubstitution 

132 

133 

134class SubstitutionImpl(Substitution): 

135 __slots__ = ( 

136 "_used", 

137 "_env", 

138 "_plugin_feature_set", 

139 "_static_variables", 

140 "_unresolvable_substitutions", 

141 "_dpkg_arch_table", 

142 "_parent", 

143 "_variable_context", 

144 ) 

145 

146 def __init__( 

147 self, 

148 /, 

149 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None, 

150 static_variables: Optional[Mapping[str, str]] = None, 

151 unresolvable_substitutions: FrozenSet[str] = frozenset(), 

152 dpkg_arch_table: Optional[DpkgArchitectureBuildProcessValuesTable] = None, 

153 environment: Optional[Mapping[str, str]] = None, 

154 parent: Optional["SubstitutionImpl"] = None, 

155 variable_context: Optional[VariableContext] = None, 

156 ) -> None: 

157 self._used: Set[str] = set() 

158 self._plugin_feature_set = plugin_feature_set 

159 self._static_variables = ( 

160 dict(static_variables) if static_variables is not None else None 

161 ) 

162 self._unresolvable_substitutions = unresolvable_substitutions 

163 self._dpkg_arch_table = ( 

164 dpkg_arch_table 

165 if dpkg_arch_table is not None 

166 else dpkg_architecture_table() 

167 ) 

168 self._env = environment if environment is not None else os.environ 

169 self._parent = parent 

170 if variable_context is not None: 

171 self._variable_context = variable_context 

172 elif self._parent is not None: 172 ↛ 175line 172 didn't jump to line 175, because the condition on line 172 was never false

173 self._variable_context = self._parent._variable_context 

174 else: 

175 raise ValueError( 

176 "variable_context is required either directly or via the parent" 

177 ) 

178 

179 def copy_for_subst_test( 

180 self, 

181 plugin_feature_set: "PluginProvidedFeatureSet", 

182 variable_context: VariableContext, 

183 *, 

184 extra_substitutions: Optional[Mapping[str, str]] = None, 

185 environment: Optional[Mapping[str, str]] = None, 

186 ) -> "Self": 

187 extra_substitutions_impl = ( 

188 dict(self._static_variables.items()) if self._static_variables else {} 

189 ) 

190 if extra_substitutions: 190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true

191 extra_substitutions_impl.update(extra_substitutions) 

192 return self.__class__( 

193 plugin_feature_set=plugin_feature_set, 

194 variable_context=variable_context, 

195 static_variables=extra_substitutions_impl, 

196 unresolvable_substitutions=self._unresolvable_substitutions, 

197 dpkg_arch_table=self._dpkg_arch_table, 

198 environment=environment if environment is not None else {}, 

199 ) 

200 

201 def variable_state(self, key: str) -> VariableNameState: 

202 if key.startswith("DEB_"): 

203 if key in self._dpkg_arch_table: 

204 return VariableNameState.DEFINED 

205 return VariableNameState.RESERVED 

206 plugin_feature_set = self._plugin_feature_set 

207 if ( 

208 plugin_feature_set is not None 

209 and key in plugin_feature_set.manifest_variables 

210 ): 

211 return VariableNameState.DEFINED 

212 if key.startswith("env:"): 

213 k = key[4:] 

214 if k in self._env: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true

215 return VariableNameState.DEFINED 

216 return VariableNameState.RESERVED 

217 if self._static_variables is not None and key in self._static_variables: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 return VariableNameState.DEFINED 

219 if key in self._unresolvable_substitutions: 

220 return VariableNameState.RESERVED 

221 if self._parent is not None: 

222 return self._parent.variable_state(key) 

223 return VariableNameState.UNDEFINED 

224 

225 def is_used(self, variable_name: str) -> bool: 

226 if variable_name in self._used: 

227 return True 

228 parent = self._parent 

229 if parent is not None: 

230 return parent.is_used(variable_name) 

231 return False 

232 

233 def _mark_used(self, variable_name: str) -> None: 

234 p = self._parent 

235 while p: 

236 # Find the parent that has the variable if possible. This ensures that is_used works 

237 # correctly. 

238 if p._static_variables is not None and variable_name in p._static_variables: 

239 p._mark_used(variable_name) 

240 break 

241 plugin_feature_set = p._plugin_feature_set 

242 if ( 242 ↛ 249line 242 didn't jump to line 249

243 plugin_feature_set is not None 

244 and variable_name in plugin_feature_set.manifest_variables 

245 and not plugin_feature_set.manifest_variables[ 

246 variable_name 

247 ].is_documentation_placeholder 

248 ): 

249 p._mark_used(variable_name) 

250 break 

251 p = p._parent 

252 self._used.add(variable_name) 

253 

254 def _replacement(self, key: str, definition_source: str) -> str: 

255 if key.startswith("DEB_") and key in self._dpkg_arch_table: 

256 return self._dpkg_arch_table[key] 

257 if key.startswith("env:"): 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true

258 k = key[4:] 

259 if k in self._env: 

260 return self._env[k] 

261 self._error( 

262 f'The environment does not contain the variable "{key}" ' 

263 f"(error occurred while trying to process {definition_source})" 

264 ) 

265 

266 # The order between extra_substitution and plugin_feature_set is leveraged by 

267 # the tests to implement mocking variables. If the order needs tweaking, 

268 # you will need a custom resolver for the tests to support mocking. 

269 static_variables = self._static_variables 

270 if static_variables and key in static_variables: 

271 return static_variables[key] 

272 plugin_feature_set = self._plugin_feature_set 

273 if plugin_feature_set is not None: 

274 provided_var = plugin_feature_set.manifest_variables.get(key) 

275 if ( 

276 provided_var is not None 

277 and not provided_var.is_documentation_placeholder 

278 ): 

279 v = provided_var.resolve(self._variable_context) 

280 # cache it for next time. 

281 if static_variables is None: 

282 static_variables = {} 

283 self._static_variables = static_variables 

284 static_variables[key] = v 

285 return v 

286 if key in self._unresolvable_substitutions: 

287 self._error( 

288 "The variable {{" + key + "}}" 

289 f" is not available while processing {definition_source}." 

290 ) 

291 parent = self._parent 

292 if parent is not None: 

293 return parent._replacement(key, definition_source) 

294 self._error( 

295 "Cannot resolve {{" + key + "}}: it is not a known key." 

296 f" The error occurred while trying to process {definition_source}" 

297 ) 

298 

299 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

300 if not extra_substitutions: 300 ↛ 301line 300 didn't jump to line 301, because the condition on line 300 was never true

301 return self 

302 return SubstitutionImpl( 

303 dpkg_arch_table=self._dpkg_arch_table, 

304 environment=self._env, 

305 static_variables=extra_substitutions, 

306 parent=self, 

307 ) 

308 

309 def with_unresolvable_substitutions( 

310 self, 

311 *extra_substitutions: str, 

312 ) -> "Substitution": 

313 if not extra_substitutions: 

314 return self 

315 return SubstitutionImpl( 

316 dpkg_arch_table=self._dpkg_arch_table, 

317 environment=self._env, 

318 unresolvable_substitutions=frozenset(extra_substitutions), 

319 parent=self, 

320 ) 

321 

322 def substitute( 

323 self, 

324 value: str, 

325 definition_source: str, 

326 /, 

327 escape_glob_characters: bool = False, 

328 ) -> str: 

329 if "{{" not in value: 

330 return value 

331 return self._apply_substitution( 

332 SUBST_VAR_RE, 

333 value, 

334 definition_source, 

335 escape_glob_characters=escape_glob_characters, 

336 )