From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../fluent.migrate/fluent/migrate/validator.py | 323 +++++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 third_party/python/fluent.migrate/fluent/migrate/validator.py (limited to 'third_party/python/fluent.migrate/fluent/migrate/validator.py') diff --git a/third_party/python/fluent.migrate/fluent/migrate/validator.py b/third_party/python/fluent.migrate/fluent/migrate/validator.py new file mode 100644 index 0000000000..4e05865434 --- /dev/null +++ b/third_party/python/fluent.migrate/fluent/migrate/validator.py @@ -0,0 +1,323 @@ +import argparse +import ast +from itertools import zip_longest + +from fluent.migrate import transforms +from fluent.migrate.errors import MigrationError +from fluent.migrate.helpers import transforms_from +from fluent.syntax import ast as FTL +from fluent.syntax.visitor import Visitor +from compare_locales import mozpath + + +class MigrateNotFoundException(Exception): + pass + + +class BadContextAPIException(Exception): + pass + + +def process_assign(node, context): + if isinstance(node.value, ast.Constant): + val = node.value.value + elif isinstance(node.value, ast.Name): + val = context.get(node.value.id) + elif isinstance(node.value, ast.Call): + val = node.value + else: + val = None + if val is None: + return + for target in node.targets: + if isinstance(target, ast.Name): + context[target.id] = val + + +class Validator: + """Validate a migration recipe + + Extract information from the migration recipe about which files to + migrate from, and which files to migrate to. + Also check for errors in the recipe, or bad API usage. + """ + + @classmethod + def validate(cls, path, code=None): + if code is None: + with open(path) as fh: + code = fh.read() + validator = cls(code, path) + return validator.inspect() + + def __init__(self, code, path): + self.ast = ast.parse(code, path) + + def inspect(self): + migrate_func = None + global_assigns = {} + for top_level in ast.iter_child_nodes(self.ast): + if isinstance(top_level, ast.FunctionDef) and top_level.name == "migrate": + if migrate_func: + raise MigrateNotFoundException("Duplicate definition of migrate") + migrate_func = top_level + details = self.inspect_migrate(migrate_func, global_assigns) + if isinstance(top_level, ast.Assign): + process_assign(top_level, global_assigns) + if isinstance(top_level, (ast.Import, ast.ImportFrom)): + if "module" in top_level._fields: + module = top_level.module + else: + module = None + for alias in top_level.names: + asname = alias.asname or alias.name + dotted = alias.name + if module: + dotted = f"{module}.{dotted}" + global_assigns[asname] = dotted + if not migrate_func: + raise MigrateNotFoundException("migrate function not found") + return details + + def inspect_migrate(self, migrate_func, global_assigns): + if len(migrate_func.args.args) != 1 or any( + getattr(migrate_func.args, arg_field) + for arg_field in migrate_func.args._fields + if arg_field != "args" + ): + raise MigrateNotFoundException("migrate takes only one positional argument") + arg = migrate_func.args.args[0] + if isinstance(arg, ast.Name): + ctx_var = arg.id # python 2 + else: + ctx_var = arg.arg # python 3 + visitor = MigrateAnalyzer(ctx_var, global_assigns) + visitor.visit(migrate_func) + return { + "references": visitor.references, + "issues": visitor.issues, + } + + +def full_name(node, global_assigns): + leafs = [] + while isinstance(node, ast.Attribute): + leafs.append(node.attr) + node = node.value + if isinstance(node, ast.Name): + leafs.append(global_assigns.get(node.id, node.id)) + return ".".join(reversed(leafs)) + + +PATH_TYPES = (str,) + (ast.Call,) + + +class MigrateAnalyzer(ast.NodeVisitor): + def __init__(self, ctx_var, global_assigns): + super().__init__() + self.ctx_var = ctx_var + self.global_assigns = global_assigns + self.depth = 0 + self.issues = [] + self.references = set() + + def generic_visit(self, node): + self.depth += 1 + super().generic_visit(node) + self.depth -= 1 + + def visit_Assign(self, node): + if self.depth == 1: + process_assign(node, self.global_assigns) + self.generic_visit(node) + + def visit_Attribute(self, node): + if isinstance(node.value, ast.Name) and node.value.id == self.ctx_var: + if node.attr not in ( + "add_transforms", + "locale", + ): + raise BadContextAPIException( + "Unexpected attribute access on {}.{}".format( + self.ctx_var, node.attr + ) + ) + self.generic_visit(node) + + def visit_Call(self, node): + if ( + isinstance(node.func, ast.Attribute) + and isinstance(node.func.value, ast.Name) + and node.func.value.id == self.ctx_var + ): + return self.call_ctx(node) + dotted = full_name(node.func, self.global_assigns) + if dotted == "fluent.migrate.helpers.transforms_from": + return self.call_helpers_transforms_from(node) + if dotted.startswith("fluent.migrate."): + return self.call_transform(node, dotted) + self.generic_visit(node) + + def call_ctx(self, node): + if node.func.attr == "add_transforms": + return self.call_add_transforms(node) + raise BadContextAPIException( + "Unexpected call on {}.{}".format(self.ctx_var, node.func.attr) + ) + + def call_add_transforms(self, node): + args_msg = ( + "Expected arguments to {}.add_transforms: " + "target_ftl_path, reference_ftl_path, list_of_transforms" + ).format(self.ctx_var) + ref_msg = ( + "Expected second argument to {}.add_transforms: " + "reference should be string or variable with string value" + ).format(self.ctx_var) + # Just check call signature here, check actual types below + if not self.check_arguments(node, (ast.AST, ast.AST, ast.AST)): + self.issues.append( + { + "msg": args_msg, + "line": node.lineno, + } + ) + return + in_reference = node.args[1] + if isinstance(in_reference, ast.Name): + in_reference = self.global_assigns.get(in_reference.id) + if isinstance(in_reference, ast.Constant): + in_reference = in_reference.value + if not isinstance(in_reference, str): + self.issues.append( + { + "msg": ref_msg, + "line": node.args[1].lineno, + } + ) + return + self.references.add(in_reference) + # Checked node.args[1]. + # There's not a lot we can say about our target path, + # ignoring that. + # For our transforms, we want more checks. + self.generic_visit(node.args[2]) + + def call_transform(self, node, dotted): + module, called = dotted.rsplit(".", 1) + if module not in ("fluent.migrate", "fluent.migrate.transforms"): + return + transform = getattr(transforms, called) + if not issubclass(transform, transforms.Source): + return + bad_args = f"{called} takes path and key as first two params" + if not self.check_arguments( + node, + ( + (ast.Constant, ast.Name), + (ast.Constant, ast.Name), + ), + allow_more=True, + check_kwargs=False, + ): + self.issues.append({"msg": bad_args, "line": node.lineno}) + return + path = node.args[0] + if isinstance(path, ast.Constant): + path = path.value + if isinstance(path, ast.Name): + path = self.global_assigns.get(path.id) + if not isinstance(path, PATH_TYPES): + self.issues.append({"msg": bad_args, "line": node.lineno}) + + def call_helpers_transforms_from(self, node): + args_msg = "Expected arguments to transforms_from: " "str, **substitions" + if not self.check_arguments(node, (ast.Constant,), check_kwargs=False): + self.issues.append( + { + "msg": args_msg, + "line": node.lineno, + } + ) + return + kwargs = {} + found_bad_keywords = False + for keyword in node.keywords: + v = keyword.value + if isinstance(v, ast.Constant): + v = v.value + if isinstance(v, ast.Name): + v = self.global_assigns.get(v.id) + if isinstance(v, ast.Call): + v = "determined at runtime" + if not isinstance(v, PATH_TYPES): + msg = "Bad keyword arg {} to transforms_from".format(keyword.arg) + self.issues.append( + { + "msg": msg, + "line": node.lineno, + } + ) + found_bad_keywords = True + else: + kwargs[keyword.arg] = v + if found_bad_keywords: + return + try: + transforms = transforms_from(node.args[0].value, **kwargs) + except MigrationError as e: + self.issues.append( + { + "msg": str(e), + "line": node.lineno, + } + ) + return + ti = TransformsInspector() + ti.visit(transforms) + self.issues.extend( + { + "msg": issue, + "line": node.lineno, + } + for issue in set(ti.issues) + ) + + def check_arguments(self, node, argspec, check_kwargs=True, allow_more=False): + if check_kwargs and ( + node.keywords or (hasattr(node, "kwargs") and node.kwargs) + ): + return False + if hasattr(node, "starargs") and node.starargs: + return False + for arg, NODE_TYPE in zip_longest(node.args, argspec): + if NODE_TYPE is None: + return True if allow_more else False + if not (isinstance(arg, NODE_TYPE)): + return False + return True + + +class TransformsInspector(Visitor): + def __init__(self): + super().__init__() + self.issues = [] + + def generic_visit(self, node): + if isinstance(node, transforms.Source): + src = node.path + # Source needs paths to be normalized + # https://bugzilla.mozilla.org/show_bug.cgi?id=1568199 + if src != mozpath.normpath(src): + self.issues.append(f'Source "{src}" needs to be a normalized path') + super().generic_visit(node) + + +def cli(): + parser = argparse.ArgumentParser() + parser.add_argument("migration") + args = parser.parse_args() + issues = Validator.validate(args.migration)["issues"] + for issue in issues: + print(issue["msg"], "at line", issue["line"]) + return 1 if issues else 0 -- cgit v1.2.3