summaryrefslogtreecommitdiffstats
path: root/third_party/python/fluent.migrate/fluent/migrate/validator.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/fluent.migrate/fluent/migrate/validator.py')
-rw-r--r--third_party/python/fluent.migrate/fluent/migrate/validator.py339
1 files changed, 339 insertions, 0 deletions
diff --git a/third_party/python/fluent.migrate/fluent/migrate/validator.py b/third_party/python/fluent.migrate/fluent/migrate/validator.py
new file mode 100644
index 0000000000..8189613c31
--- /dev/null
+++ b/third_party/python/fluent.migrate/fluent/migrate/validator.py
@@ -0,0 +1,339 @@
+# coding=utf8
+from __future__ import absolute_import
+
+import argparse
+import ast
+import six
+from six.moves import zip_longest
+
+from fluent.migrate import transforms
+from fluent.migrate.errors import MigrationError
+from fluent.migrate.helpers import transforms_from
+from fluent.syntax import ast as FTL
+from fluent.syntax.visitor import Visitor
+from compare_locales import mozpath
+
+
+class MigrateNotFoundException(Exception):
+ pass
+
+
+class BadContextAPIException(Exception):
+ pass
+
+
+def process_assign(node, context):
+ if isinstance(node.value, ast.Str):
+ val = node.value.s
+ elif isinstance(node.value, ast.Name):
+ val = context.get(node.value.id)
+ elif isinstance(node.value, ast.Call):
+ val = node.value
+ if val is None:
+ return
+ for target in node.targets:
+ if isinstance(target, ast.Name):
+ context[target.id] = val
+
+
+class Validator(object):
+ """Validate a migration recipe
+
+ Extract information from the migration recipe about which files to
+ migrate from, and which files to migrate to.
+ Also check for errors in the recipe, or bad API usage.
+ """
+
+ @classmethod
+ def validate(cls, path, code=None):
+ if code is None:
+ with open(path) as fh:
+ code = fh.read()
+ validator = cls(code, path)
+ return validator.inspect()
+
+ def __init__(self, code, path):
+ self.ast = ast.parse(code, path)
+
+ def inspect(self):
+ migrate_func = None
+ global_assigns = {}
+ for top_level in ast.iter_child_nodes(self.ast):
+ if (
+ isinstance(top_level, ast.FunctionDef)
+ and top_level.name == 'migrate'
+ ):
+ if migrate_func:
+ raise MigrateNotFoundException(
+ 'Duplicate definition of migrate'
+ )
+ migrate_func = top_level
+ details = self.inspect_migrate(migrate_func, global_assigns)
+ if isinstance(top_level, ast.Assign):
+ process_assign(top_level, global_assigns)
+ if isinstance(top_level, (ast.Import, ast.ImportFrom)):
+ if 'module' in top_level._fields:
+ module = top_level.module
+ else:
+ module = None
+ for alias in top_level.names:
+ asname = alias.asname or alias.name
+ dotted = alias.name
+ if module:
+ dotted = '{}.{}'.format(module, dotted)
+ global_assigns[asname] = dotted
+ if not migrate_func:
+ raise MigrateNotFoundException(
+ 'migrate function not found'
+ )
+ return details
+
+ def inspect_migrate(self, migrate_func, global_assigns):
+ if (
+ len(migrate_func.args.args) != 1 or
+ any(
+ getattr(migrate_func.args, arg_field)
+ for arg_field in migrate_func.args._fields
+ if arg_field != 'args'
+ )
+ ):
+ raise MigrateNotFoundException(
+ 'migrate takes only one positional argument'
+ )
+ arg = migrate_func.args.args[0]
+ if isinstance(arg, ast.Name):
+ ctx_var = arg.id # python 2
+ else:
+ ctx_var = arg.arg # python 3
+ visitor = MigrateAnalyzer(ctx_var, global_assigns)
+ visitor.visit(migrate_func)
+ return {
+ 'references': visitor.references,
+ 'issues': visitor.issues,
+ }
+
+
+def full_name(node, global_assigns):
+ leafs = []
+ while isinstance(node, ast.Attribute):
+ leafs.append(node.attr)
+ node = node.value
+ if isinstance(node, ast.Name):
+ leafs.append(global_assigns.get(node.id, node.id))
+ return '.'.join(reversed(leafs))
+
+
+PATH_TYPES = six.string_types + (ast.Call,)
+
+
+class MigrateAnalyzer(ast.NodeVisitor):
+ def __init__(self, ctx_var, global_assigns):
+ super(MigrateAnalyzer, self).__init__()
+ self.ctx_var = ctx_var
+ self.global_assigns = global_assigns
+ self.depth = 0
+ self.issues = []
+ self.references = set()
+
+ def generic_visit(self, node):
+ self.depth += 1
+ super(MigrateAnalyzer, self).generic_visit(node)
+ self.depth -= 1
+
+ def visit_Assign(self, node):
+ if self.depth == 1:
+ process_assign(node, self.global_assigns)
+ self.generic_visit(node)
+
+ def visit_Attribute(self, node):
+ if isinstance(node.value, ast.Name) and node.value.id == self.ctx_var:
+ if node.attr not in (
+ 'add_transforms',
+ 'locale',
+ ):
+ raise BadContextAPIException(
+ 'Unexpected attribute access on {}.{}'.format(
+ self.ctx_var, node.attr
+ )
+ )
+ self.generic_visit(node)
+
+ def visit_Call(self, node):
+ if (
+ isinstance(node.func, ast.Attribute) and
+ isinstance(node.func.value, ast.Name) and
+ node.func.value.id == self.ctx_var
+ ):
+ return self.call_ctx(node)
+ dotted = full_name(node.func, self.global_assigns)
+ if dotted == 'fluent.migrate.helpers.transforms_from':
+ return self.call_helpers_transforms_from(node)
+ if dotted.startswith('fluent.migrate.'):
+ return self.call_transform(node, dotted)
+ self.generic_visit(node)
+
+ def call_ctx(self, node):
+ if node.func.attr == 'add_transforms':
+ return self.call_add_transforms(node)
+ raise BadContextAPIException(
+ 'Unexpected call on {}.{}'.format(
+ self.ctx_var, node.func.attr
+ )
+ )
+
+ def call_add_transforms(self, node):
+ args_msg = (
+ 'Expected arguments to {}.add_transforms: '
+ 'target_ftl_path, reference_ftl_path, list_of_transforms'
+ ).format(self.ctx_var)
+ ref_msg = (
+ 'Expected second argument to {}.add_transforms: '
+ 'reference should be string or variable with string value'
+ ).format(self.ctx_var)
+ # Just check call signature here, check actual types below
+ if not self.check_arguments(node, (ast.AST, ast.AST, ast.AST)):
+ self.issues.append({
+ 'msg': args_msg,
+ 'line': node.lineno,
+ })
+ return
+ in_reference = node.args[1]
+ if isinstance(in_reference, ast.Name):
+ in_reference = self.global_assigns.get(in_reference.id)
+ if isinstance(in_reference, ast.Str):
+ in_reference = in_reference.s
+ if not isinstance(in_reference, six.string_types):
+ self.issues.append({
+ 'msg': ref_msg,
+ 'line': node.args[1].lineno,
+ })
+ return
+ self.references.add(in_reference)
+ # Checked node.args[1].
+ # There's not a lot we can say about our target path,
+ # ignoring that.
+ # For our transforms, we want more checks.
+ self.generic_visit(node.args[2])
+
+ def call_transform(self, node, dotted):
+ module, called = dotted.rsplit('.', 1)
+ if module not in ('fluent.migrate', 'fluent.migrate.transforms'):
+ return
+ transform = getattr(transforms, called)
+ if not issubclass(transform, transforms.Source):
+ return
+ bad_args = '{} takes path and key as first two params'.format(called)
+ if not self.check_arguments(
+ node, ((ast.Str, ast.Name), (ast.Str, ast.Name),),
+ allow_more=True, check_kwargs=False
+ ):
+ self.issues.append({
+ 'msg': bad_args,
+ 'line': node.lineno
+ })
+ return
+ path = node.args[0]
+ if isinstance(path, ast.Str):
+ path = path.s
+ if isinstance(path, ast.Name):
+ path = self.global_assigns.get(path.id)
+ if not isinstance(path, PATH_TYPES):
+ self.issues.append({
+ 'msg': bad_args,
+ 'line': node.lineno
+ })
+
+ def call_helpers_transforms_from(self, node):
+ args_msg = (
+ 'Expected arguments to transforms_from: '
+ 'str, **substitions'
+ )
+ if not self.check_arguments(
+ node, (ast.Str,), check_kwargs=False
+ ):
+ self.issues.append({
+ 'msg': args_msg,
+ 'line': node.lineno,
+ })
+ return
+ kwargs = {}
+ found_bad_keywords = False
+ for keyword in node.keywords:
+ v = keyword.value
+ if isinstance(v, ast.Str):
+ v = v.s
+ if isinstance(v, ast.Name):
+ v = self.global_assigns.get(v.id)
+ if isinstance(v, ast.Call):
+ v = 'determined at runtime'
+ if not isinstance(v, PATH_TYPES):
+ msg = 'Bad keyword arg {} to transforms_from'.format(
+ keyword.arg
+ )
+ self.issues.append({
+ 'msg': msg,
+ 'line': node.lineno,
+ })
+ found_bad_keywords = True
+ else:
+ kwargs[keyword.arg] = v
+ if found_bad_keywords:
+ return
+ try:
+ transforms = transforms_from(node.args[0].s, **kwargs)
+ except MigrationError as e:
+ self.issues.append({
+ 'msg': str(e),
+ 'line': node.lineno,
+ })
+ return
+ ti = TransformsInspector()
+ ti.visit(transforms)
+ self.issues.extend({
+ 'msg': issue,
+ 'line': node.lineno,
+ } for issue in set(ti.issues))
+
+ def check_arguments(
+ self, node, argspec, check_kwargs=True, allow_more=False
+ ):
+ if check_kwargs and (
+ node.keywords or
+ (hasattr(node, 'kwargs') and node.kwargs)
+ ):
+ return False
+ if hasattr(node, 'starargs') and node.starargs:
+ return False
+ for arg, NODE_TYPE in zip_longest(node.args, argspec):
+ if NODE_TYPE is None:
+ return True if allow_more else False
+ if not (isinstance(arg, NODE_TYPE)):
+ return False
+ return True
+
+
+class TransformsInspector(Visitor):
+ def __init__(self):
+ super(TransformsInspector, self).__init__()
+ self.issues = []
+
+ def generic_visit(self, node):
+ if isinstance(node, transforms.Source):
+ src = node.path
+ # Source needs paths to be normalized
+ # https://bugzilla.mozilla.org/show_bug.cgi?id=1568199
+ if src != mozpath.normpath(src):
+ self.issues.append(
+ 'Source "{}" needs to be a normalized path'.format(src)
+ )
+ super(TransformsInspector, self).generic_visit(node)
+
+
+def cli():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('migration')
+ args = parser.parse_args()
+ issues = Validator.validate(args.migration)['issues']
+ for issue in issues:
+ print(issue['msg'], 'at line', issue['line'])
+ return 1 if issues else 0