diff options
Diffstat (limited to 'third_party/python/fluent.migrate/fluent/migrate/validator.py')
-rw-r--r-- | third_party/python/fluent.migrate/fluent/migrate/validator.py | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/third_party/python/fluent.migrate/fluent/migrate/validator.py b/third_party/python/fluent.migrate/fluent/migrate/validator.py new file mode 100644 index 0000000000..8189613c31 --- /dev/null +++ b/third_party/python/fluent.migrate/fluent/migrate/validator.py @@ -0,0 +1,339 @@ +# coding=utf8 +from __future__ import absolute_import + +import argparse +import ast +import six +from six.moves import zip_longest + +from fluent.migrate import transforms +from fluent.migrate.errors import MigrationError +from fluent.migrate.helpers import transforms_from +from fluent.syntax import ast as FTL +from fluent.syntax.visitor import Visitor +from compare_locales import mozpath + + +class MigrateNotFoundException(Exception): + pass + + +class BadContextAPIException(Exception): + pass + + +def process_assign(node, context): + if isinstance(node.value, ast.Str): + val = node.value.s + elif isinstance(node.value, ast.Name): + val = context.get(node.value.id) + elif isinstance(node.value, ast.Call): + val = node.value + if val is None: + return + for target in node.targets: + if isinstance(target, ast.Name): + context[target.id] = val + + +class Validator(object): + """Validate a migration recipe + + Extract information from the migration recipe about which files to + migrate from, and which files to migrate to. + Also check for errors in the recipe, or bad API usage. + """ + + @classmethod + def validate(cls, path, code=None): + if code is None: + with open(path) as fh: + code = fh.read() + validator = cls(code, path) + return validator.inspect() + + def __init__(self, code, path): + self.ast = ast.parse(code, path) + + def inspect(self): + migrate_func = None + global_assigns = {} + for top_level in ast.iter_child_nodes(self.ast): + if ( + isinstance(top_level, ast.FunctionDef) + and top_level.name == 'migrate' + ): + if migrate_func: + raise MigrateNotFoundException( + 'Duplicate definition of migrate' + ) + migrate_func = top_level + details = self.inspect_migrate(migrate_func, global_assigns) + if isinstance(top_level, ast.Assign): + process_assign(top_level, global_assigns) + if isinstance(top_level, (ast.Import, ast.ImportFrom)): + if 'module' in top_level._fields: + module = top_level.module + else: + module = None + for alias in top_level.names: + asname = alias.asname or alias.name + dotted = alias.name + if module: + dotted = '{}.{}'.format(module, dotted) + global_assigns[asname] = dotted + if not migrate_func: + raise MigrateNotFoundException( + 'migrate function not found' + ) + return details + + def inspect_migrate(self, migrate_func, global_assigns): + if ( + len(migrate_func.args.args) != 1 or + any( + getattr(migrate_func.args, arg_field) + for arg_field in migrate_func.args._fields + if arg_field != 'args' + ) + ): + raise MigrateNotFoundException( + 'migrate takes only one positional argument' + ) + arg = migrate_func.args.args[0] + if isinstance(arg, ast.Name): + ctx_var = arg.id # python 2 + else: + ctx_var = arg.arg # python 3 + visitor = MigrateAnalyzer(ctx_var, global_assigns) + visitor.visit(migrate_func) + return { + 'references': visitor.references, + 'issues': visitor.issues, + } + + +def full_name(node, global_assigns): + leafs = [] + while isinstance(node, ast.Attribute): + leafs.append(node.attr) + node = node.value + if isinstance(node, ast.Name): + leafs.append(global_assigns.get(node.id, node.id)) + return '.'.join(reversed(leafs)) + + +PATH_TYPES = six.string_types + (ast.Call,) + + +class MigrateAnalyzer(ast.NodeVisitor): + def __init__(self, ctx_var, global_assigns): + super(MigrateAnalyzer, self).__init__() + self.ctx_var = ctx_var + self.global_assigns = global_assigns + self.depth = 0 + self.issues = [] + self.references = set() + + def generic_visit(self, node): + self.depth += 1 + super(MigrateAnalyzer, self).generic_visit(node) + self.depth -= 1 + + def visit_Assign(self, node): + if self.depth == 1: + process_assign(node, self.global_assigns) + self.generic_visit(node) + + def visit_Attribute(self, node): + if isinstance(node.value, ast.Name) and node.value.id == self.ctx_var: + if node.attr not in ( + 'add_transforms', + 'locale', + ): + raise BadContextAPIException( + 'Unexpected attribute access on {}.{}'.format( + self.ctx_var, node.attr + ) + ) + self.generic_visit(node) + + def visit_Call(self, node): + if ( + isinstance(node.func, ast.Attribute) and + isinstance(node.func.value, ast.Name) and + node.func.value.id == self.ctx_var + ): + return self.call_ctx(node) + dotted = full_name(node.func, self.global_assigns) + if dotted == 'fluent.migrate.helpers.transforms_from': + return self.call_helpers_transforms_from(node) + if dotted.startswith('fluent.migrate.'): + return self.call_transform(node, dotted) + self.generic_visit(node) + + def call_ctx(self, node): + if node.func.attr == 'add_transforms': + return self.call_add_transforms(node) + raise BadContextAPIException( + 'Unexpected call on {}.{}'.format( + self.ctx_var, node.func.attr + ) + ) + + def call_add_transforms(self, node): + args_msg = ( + 'Expected arguments to {}.add_transforms: ' + 'target_ftl_path, reference_ftl_path, list_of_transforms' + ).format(self.ctx_var) + ref_msg = ( + 'Expected second argument to {}.add_transforms: ' + 'reference should be string or variable with string value' + ).format(self.ctx_var) + # Just check call signature here, check actual types below + if not self.check_arguments(node, (ast.AST, ast.AST, ast.AST)): + self.issues.append({ + 'msg': args_msg, + 'line': node.lineno, + }) + return + in_reference = node.args[1] + if isinstance(in_reference, ast.Name): + in_reference = self.global_assigns.get(in_reference.id) + if isinstance(in_reference, ast.Str): + in_reference = in_reference.s + if not isinstance(in_reference, six.string_types): + self.issues.append({ + 'msg': ref_msg, + 'line': node.args[1].lineno, + }) + return + self.references.add(in_reference) + # Checked node.args[1]. + # There's not a lot we can say about our target path, + # ignoring that. + # For our transforms, we want more checks. + self.generic_visit(node.args[2]) + + def call_transform(self, node, dotted): + module, called = dotted.rsplit('.', 1) + if module not in ('fluent.migrate', 'fluent.migrate.transforms'): + return + transform = getattr(transforms, called) + if not issubclass(transform, transforms.Source): + return + bad_args = '{} takes path and key as first two params'.format(called) + if not self.check_arguments( + node, ((ast.Str, ast.Name), (ast.Str, ast.Name),), + allow_more=True, check_kwargs=False + ): + self.issues.append({ + 'msg': bad_args, + 'line': node.lineno + }) + return + path = node.args[0] + if isinstance(path, ast.Str): + path = path.s + if isinstance(path, ast.Name): + path = self.global_assigns.get(path.id) + if not isinstance(path, PATH_TYPES): + self.issues.append({ + 'msg': bad_args, + 'line': node.lineno + }) + + def call_helpers_transforms_from(self, node): + args_msg = ( + 'Expected arguments to transforms_from: ' + 'str, **substitions' + ) + if not self.check_arguments( + node, (ast.Str,), check_kwargs=False + ): + self.issues.append({ + 'msg': args_msg, + 'line': node.lineno, + }) + return + kwargs = {} + found_bad_keywords = False + for keyword in node.keywords: + v = keyword.value + if isinstance(v, ast.Str): + v = v.s + if isinstance(v, ast.Name): + v = self.global_assigns.get(v.id) + if isinstance(v, ast.Call): + v = 'determined at runtime' + if not isinstance(v, PATH_TYPES): + msg = 'Bad keyword arg {} to transforms_from'.format( + keyword.arg + ) + self.issues.append({ + 'msg': msg, + 'line': node.lineno, + }) + found_bad_keywords = True + else: + kwargs[keyword.arg] = v + if found_bad_keywords: + return + try: + transforms = transforms_from(node.args[0].s, **kwargs) + except MigrationError as e: + self.issues.append({ + 'msg': str(e), + 'line': node.lineno, + }) + return + ti = TransformsInspector() + ti.visit(transforms) + self.issues.extend({ + 'msg': issue, + 'line': node.lineno, + } for issue in set(ti.issues)) + + def check_arguments( + self, node, argspec, check_kwargs=True, allow_more=False + ): + if check_kwargs and ( + node.keywords or + (hasattr(node, 'kwargs') and node.kwargs) + ): + return False + if hasattr(node, 'starargs') and node.starargs: + return False + for arg, NODE_TYPE in zip_longest(node.args, argspec): + if NODE_TYPE is None: + return True if allow_more else False + if not (isinstance(arg, NODE_TYPE)): + return False + return True + + +class TransformsInspector(Visitor): + def __init__(self): + super(TransformsInspector, self).__init__() + self.issues = [] + + def generic_visit(self, node): + if isinstance(node, transforms.Source): + src = node.path + # Source needs paths to be normalized + # https://bugzilla.mozilla.org/show_bug.cgi?id=1568199 + if src != mozpath.normpath(src): + self.issues.append( + 'Source "{}" needs to be a normalized path'.format(src) + ) + super(TransformsInspector, self).generic_visit(node) + + +def cli(): + parser = argparse.ArgumentParser() + parser.add_argument('migration') + args = parser.parse_args() + issues = Validator.validate(args.migration)['issues'] + for issue in issues: + print(issue['msg'], 'at line', issue['line']) + return 1 if issues else 0 |