diff options
Diffstat (limited to 'third_party/python/fluent.migrate/fluent/migrate/_context.py')
-rw-r--r-- | third_party/python/fluent.migrate/fluent/migrate/_context.py | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/third_party/python/fluent.migrate/fluent/migrate/_context.py b/third_party/python/fluent.migrate/fluent/migrate/_context.py new file mode 100644 index 0000000000..14c4de15e4 --- /dev/null +++ b/third_party/python/fluent.migrate/fluent/migrate/_context.py @@ -0,0 +1,329 @@ +import os +import codecs +from functools import partial +import logging +from itertools import zip_longest + +import fluent.syntax.ast as FTL +from fluent.syntax.parser import FluentParser +from fluent.syntax.serializer import FluentSerializer +from compare_locales.parser import getParser +from compare_locales.plurals import get_plural + +from .evaluator import Evaluator +from .merge import merge_resource +from .errors import ( + UnreadableReferenceError, +) + + +class InternalContext: + """Internal context for merging translation resources. + + For the public interface, see `context.MigrationContext`. + """ + + def __init__( + self, lang, reference_dir, localization_dir, enforce_translated=False + ): + self.fluent_parser = FluentParser(with_spans=False) + self.fluent_serializer = FluentSerializer() + + # An iterable of plural category names relevant to the context's + # language. E.g. ('one', 'other') for English. + self.plural_categories = get_plural(lang) + if self.plural_categories is None: + logger = logging.getLogger('migrate') + logger.warning( + 'Plural rule for "{}" is not defined in ' + 'compare-locales'.format(lang)) + self.plural_categories = ('one', 'other') + + self.enforce_translated = enforce_translated + # Parsed input resources stored by resource path. + self.reference_resources = {} + self.localization_resources = {} + self.target_resources = {} + + # An iterable of `FTL.Message` objects some of whose nodes can be the + # transform operations. + self.transforms = {} + + # The evaluator instance is an AST transformer capable of walking an + # AST hierarchy and evaluating nodes which are migration Transforms. + self.evaluator = Evaluator(self) + + def read_ftl_resource(self, path): + """Read an FTL resource and parse it into an AST.""" + f = codecs.open(path, 'r', 'utf8') + try: + contents = f.read() + except UnicodeDecodeError as err: + logger = logging.getLogger('migrate') + logger.warning(f'Unable to read file {path}: {err}') + raise err + finally: + f.close() + + ast = self.fluent_parser.parse(contents) + + annots = [ + annot + for entry in ast.body + if isinstance(entry, FTL.Junk) + for annot in entry.annotations + ] + + if len(annots): + logger = logging.getLogger('migrate') + for annot in annots: + msg = annot.message + logger.warning(f'Syntax error in {path}: {msg}') + + return ast + + def read_legacy_resource(self, path): + """Read a legacy resource and parse it into a dict.""" + parser = getParser(path) + parser.readFile(path) + # Transform the parsed result which is an iterator into a dict. + return { + entity.key: entity.val for entity in parser + if entity.localized or self.enforce_translated + } + + def read_reference_ftl(self, path): + """Read and parse a reference FTL file. + + A missing resource file is a fatal error and will raise an + UnreadableReferenceError. + """ + fullpath = os.path.join(self.reference_dir, path) + try: + return self.read_ftl_resource(fullpath) + except OSError: + error_message = f'Missing reference file: {fullpath}' + logging.getLogger('migrate').error(error_message) + raise UnreadableReferenceError(error_message) + except UnicodeDecodeError as err: + error_message = f'Error reading file {fullpath}: {err}' + logging.getLogger('migrate').error(error_message) + raise UnreadableReferenceError(error_message) + + def read_localization_ftl(self, path): + """Read and parse an existing localization FTL file. + + Create a new FTL.Resource if the file doesn't exist or can't be + decoded. + """ + fullpath = os.path.join(self.localization_dir, path) + try: + return self.read_ftl_resource(fullpath) + except OSError: + logger = logging.getLogger('migrate') + logger.info( + 'Localization file {} does not exist and ' + 'it will be created'.format(path)) + return FTL.Resource() + except UnicodeDecodeError: + logger = logging.getLogger('migrate') + logger.warning( + 'Localization file {} has broken encoding. ' + 'It will be re-created and some translations ' + 'may be lost'.format(path)) + return FTL.Resource() + + def maybe_add_localization(self, path): + """Add a localization resource to migrate translations from. + + Uses a compare-locales parser to create a dict of (key, string value) + tuples. + For Fluent sources, we store the AST. + """ + try: + fullpath = os.path.join(self.localization_dir, path) + if not fullpath.endswith('.ftl'): + collection = self.read_legacy_resource(fullpath) + else: + collection = self.read_ftl_resource(fullpath) + except OSError: + logger = logging.getLogger('migrate') + logger.warning(f'Missing localization file: {path}') + else: + self.localization_resources[path] = collection + + def get_legacy_source(self, path, key): + """Get an entity value from a localized legacy source. + + Used by the `Source` transform. + """ + resource = self.localization_resources[path] + return resource.get(key, None) + + def get_fluent_source_pattern(self, path, key): + """Get a pattern from a localized Fluent source. + + If the key contains a `.`, does an attribute lookup. + Used by the `COPY_PATTERN` transform. + """ + resource = self.localization_resources[path] + msg_key, _, attr_key = key.partition('.') + found = None + for entry in resource.body: + if isinstance(entry, (FTL.Message, FTL.Term)): + if entry.id.name == msg_key: + found = entry + break + if found is None: + return None + if not attr_key: + return found.value + for attribute in found.attributes: + if attribute.id.name == attr_key: + return attribute.value + return None + + def messages_equal(self, res1, res2): + """Compare messages and terms of two FTL resources. + + Uses FTL.BaseNode.equals to compare all messages/terms + in two FTL resources. + If the order or number of messages differ, the result is also False. + """ + def message_id(message): + "Return the message's identifer name for sorting purposes." + return message.id.name + + messages1 = sorted( + (entry for entry in res1.body + if isinstance(entry, FTL.Message) + or isinstance(entry, FTL.Term)), + key=message_id) + messages2 = sorted( + (entry for entry in res2.body + if isinstance(entry, FTL.Message) + or isinstance(entry, FTL.Term)), + key=message_id) + for msg1, msg2 in zip_longest(messages1, messages2): + if msg1 is None or msg2 is None: + return False + if not msg1.equals(msg2): + return False + return True + + def merge_changeset(self, changeset=None, known_translations=None): + """Return a generator of FTL ASTs for the changeset. + + The input data must be configured earlier using the `add_*` methods. + if given, `changeset` must be a set of (path, key) tuples describing + which legacy translations are to be merged. If `changeset` is None, + all legacy translations will be allowed to be migrated in a single + changeset. + + We use the `in_changeset` method to determine if a message should be + migrated for the given changeset. + + Given `changeset`, return a dict whose keys are resource paths and + values are `FTL.Resource` instances. The values will also be used to + update this context's existing localization resources. + """ + + if changeset is None: + # Merge all known legacy translations. Used in tests. + changeset = { + (path, key) + for path, strings in self.localization_resources.items() + if not path.endswith('.ftl') + for key in strings.keys() + } + + if known_translations is None: + known_translations = changeset + + for path, reference in self.reference_resources.items(): + current = self.target_resources[path] + transforms = self.transforms.get(path, []) + in_changeset = partial( + self.in_changeset, changeset, known_translations, path) + + # Merge legacy translations with the existing ones using the + # reference as a template. + snapshot = merge_resource( + self, reference, current, transforms, in_changeset + ) + + # Skip this path if the messages in the merged snapshot are + # identical to those in the current state of the localization file. + # This may happen when: + # + # - none of the transforms is in the changset, or + # - all messages which would be migrated by the context's + # transforms already exist in the current state. + if self.messages_equal(current, snapshot): + continue + + # Store the merged snapshot on the context so that the next merge + # already takes it into account as the existing localization. + self.target_resources[path] = snapshot + + # The result for this path is a complete `FTL.Resource`. + yield path, snapshot + + def in_changeset(self, changeset, known_translations, path, ident): + """Check if a message should be migrated in this changeset. + + The message is identified by path and ident. + + + A message will be migrated only if all of its dependencies + are present in the currently processed changeset. + + If a transform defined for this message points to a missing + legacy translation, this message will not be merged. The + missing legacy dependency won't be present in the changeset. + + This also means that partially translated messages (e.g. + constructed from two legacy strings out of which only one is + avaiable) will never be migrated. + """ + message_deps = self.dependencies.get((path, ident), None) + + # Don't merge if we don't have a transform for this message. + if message_deps is None: + return False + + # As a special case, if a transform exists but has no + # dependecies, it's a hardcoded `FTL.Node` which doesn't + # migrate any existing translation but rather creates a new + # one. Merge it. + if len(message_deps) == 0: + return True + + # Make sure all the dependencies are present in the current + # changeset. Partial migrations are not currently supported. + # See https://bugzilla.mozilla.org/show_bug.cgi?id=1321271 + # We only return True if our current changeset touches + # the transform, and we have all of the dependencies. + active_deps = message_deps & changeset + available_deps = message_deps & known_translations + return active_deps and message_deps == available_deps + + def serialize_changeset(self, changeset, known_translations=None): + """Return a dict of serialized FTLs for the changeset. + + Given `changeset`, return a dict whose keys are resource paths and + values are serialized FTL snapshots. + """ + + return { + path: self.fluent_serializer.serialize(snapshot) + for path, snapshot in self.merge_changeset( + changeset, known_translations + ) + } + + def evaluate(self, node): + return self.evaluator.visit(node) + + +logging.basicConfig() |