diff options
Diffstat (limited to 'third_party/python/glean_parser/glean_parser/lint.py')
-rw-r--r-- | third_party/python/glean_parser/glean_parser/lint.py | 538 |
1 files changed, 538 insertions, 0 deletions
diff --git a/third_party/python/glean_parser/glean_parser/lint.py b/third_party/python/glean_parser/glean_parser/lint.py new file mode 100644 index 0000000000..0dc2bddd5d --- /dev/null +++ b/third_party/python/glean_parser/glean_parser/lint.py @@ -0,0 +1,538 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + +import enum +from pathlib import Path +import re +import sys +from typing import ( + Any, + Callable, + Dict, + Generator, + List, + Iterable, + Optional, + Tuple, + Union, +) # noqa + + +from . import metrics +from . import parser +from . import pings +from . import tags +from . import util + + +LintGenerator = Generator[str, None, None] + + +class CheckType(enum.Enum): + warning = 0 + error = 1 + + +def _split_words(name: str) -> List[str]: + """ + Helper function to split words on either `.` or `_`. + """ + return re.split("[._-]", name) + + +def _english_list(items: List[str]) -> str: + """ + Helper function to format a list [A, B, C] as "'A', 'B', or 'C'". + """ + if len(items) == 0: + return "" + elif len(items) == 1: + return f"'{items[0]}'" + else: + return "{}, or '{}'".format( + ", ".join([f"'{x}'" for x in items[:-1]]), items[-1] + ) + + +def _hamming_distance(str1: str, str2: str) -> int: + """ + Count the # of differences between strings str1 and str2, + padding the shorter one with whitespace + """ + + diffs = 0 + if len(str1) < len(str2): + str1, str2 = str2, str1 + len_dist = len(str1) - len(str2) + str2 += " " * len_dist + + for ch1, ch2 in zip(str1, str2): + if ch1 != ch2: + diffs += 1 + return diffs + + +def check_common_prefix( + category_name: str, metrics: Iterable[metrics.Metric] +) -> LintGenerator: + """ + Check if all metrics begin with a common prefix. + """ + metric_words = sorted([_split_words(metric.name) for metric in metrics]) + + if len(metric_words) < 2: + return + + first = metric_words[0] + last = metric_words[-1] + + for i in range(min(len(first), len(last))): + if first[i] != last[i]: + break + + if i > 0: + common_prefix = "_".join(first[:i]) + yield ( + f"Within category '{category_name}', all metrics begin with " + f"prefix '{common_prefix}'." + "Remove the prefixes on the metric names and (possibly) " + "rename the category." + ) + + +def check_unit_in_name( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + """ + The metric name ends in a unit. + """ + TIME_UNIT_ABBREV = { + "nanosecond": "ns", + "microsecond": "us", + "millisecond": "ms", + "second": "s", + "minute": "m", + "hour": "h", + "day": "d", + } + + MEMORY_UNIT_ABBREV = { + "byte": "b", + "kilobyte": "kb", + "megabyte": "mb", + "gigabyte": "gb", + } + + name_words = _split_words(metric.name) + unit_in_name = name_words[-1] + + time_unit = getattr(metric, "time_unit", None) + memory_unit = getattr(metric, "memory_unit", None) + unit = getattr(metric, "unit", None) + + if time_unit is not None: + if ( + unit_in_name == TIME_UNIT_ABBREV.get(time_unit.name) + or unit_in_name == time_unit.name + ): + yield ( + f"Suffix '{unit_in_name}' is redundant with time_unit " + f"'{time_unit.name}'. Only include time_unit." + ) + elif ( + unit_in_name in TIME_UNIT_ABBREV.keys() + or unit_in_name in TIME_UNIT_ABBREV.values() + ): + yield ( + f"Suffix '{unit_in_name}' doesn't match time_unit " + f"'{time_unit.name}'. " + "Confirm the unit is correct and only include time_unit." + ) + + elif memory_unit is not None: + if ( + unit_in_name == MEMORY_UNIT_ABBREV.get(memory_unit.name) + or unit_in_name == memory_unit.name + ): + yield ( + f"Suffix '{unit_in_name}' is redundant with memory_unit " + f"'{memory_unit.name}'. " + "Only include memory_unit." + ) + elif ( + unit_in_name in MEMORY_UNIT_ABBREV.keys() + or unit_in_name in MEMORY_UNIT_ABBREV.values() + ): + yield ( + f"Suffix '{unit_in_name}' doesn't match memory_unit " + f"{memory_unit.name}'. " + "Confirm the unit is correct and only include memory_unit." + ) + + elif unit is not None: + if unit_in_name == unit: + yield ( + f"Suffix '{unit_in_name}' is redundant with unit param " + f"'{unit}'. " + "Only include unit." + ) + + +def check_category_generic( + category_name: str, metrics: Iterable[metrics.Metric] +) -> LintGenerator: + """ + The category name is too generic. + """ + GENERIC_CATEGORIES = ["metrics", "events"] + + if category_name in GENERIC_CATEGORIES: + yield ( + f"Category '{category_name}' is too generic. " + f"Don't use {_english_list(GENERIC_CATEGORIES)} for category names" + ) + + +def check_bug_number( + metric: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any] +) -> LintGenerator: + number_bugs = [str(bug) for bug in metric.bugs if isinstance(bug, int)] + + if len(number_bugs): + yield ( + f"For bugs {', '.join(number_bugs)}: " + "Bug numbers are deprecated and should be changed to full URLs. " + f"For example, use 'http://bugzilla.mozilla.org/{number_bugs[0]}' " + f"instead of '{number_bugs[0]}'." + ) + + +def check_valid_in_baseline( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + allow_reserved = parser_config.get("allow_reserved", False) + + if not allow_reserved and "baseline" in metric.send_in_pings: + yield ( + "The baseline ping is Glean-internal. " + "Remove 'baseline' from the send_in_pings array." + ) + + +def check_misspelled_pings( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + for ping in metric.send_in_pings: + for builtin in pings.RESERVED_PING_NAMES: + distance = _hamming_distance(ping, builtin) + if distance == 1: + yield f"Ping '{ping}' seems misspelled. Did you mean '{builtin}'?" + + +def check_tags_required( + metric_or_ping: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any] +) -> LintGenerator: + if parser_config.get("require_tags", False) and not len( + metric_or_ping.metadata.get("tags", []) + ): + yield "Tags are required but no tags specified" + + +def check_user_lifetime_expiration( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + if metric.lifetime == metrics.Lifetime.user and metric.expires != "never": + yield ( + "Metrics with 'user' lifetime cannot have an expiration date. " + "They live as long as the user profile does. " + "Set expires to 'never'." + ) + + +def check_expired_date( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + try: + metric.validate_expires() + except ValueError as e: + yield (str(e)) + + +def check_expired_metric( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + if metric.is_expired(): + yield ("Metric has expired. Please consider removing it.") + + +def check_old_event_api( + metric: metrics.Metric, parser_config: Dict[str, Any] +) -> LintGenerator: + # Glean v52.0.0 removed the old events API. + # The metrics-2-0-0 schema still supports it. + # We want to warn about it. + # This can go when we introduce 3-0-0 + + if not isinstance(metric, metrics.Event): + return + + if not all("type" in x for x in metric.extra_keys.values()): + yield ("The old event API is gone. Extra keys require a type.") + + +def check_redundant_ping( + pings: pings.Ping, parser_config: Dict[str, Any] +) -> LintGenerator: + """ + Check if the pings contains 'ping' as the prefix or suffix, or 'ping' or 'custom' + """ + ping_words = _split_words(pings.name) + + if len(ping_words) != 0: + ping_first_word = ping_words[0] + ping_last_word = ping_words[-1] + + if ping_first_word == "ping": + yield ("The prefix 'ping' is redundant.") + elif ping_last_word == "ping": + yield ("The suffix 'ping' is redundant.") + elif "ping" in ping_words: + yield ("The word 'ping' is redundant.") + elif "custom" in ping_words: + yield ("The word 'custom' is redundant.") + + +# The checks that operate on an entire category of metrics: +# {NAME: (function, is_error)} +CATEGORY_CHECKS: Dict[ + str, Tuple[Callable[[str, Iterable[metrics.Metric]], LintGenerator], CheckType] +] = { + "COMMON_PREFIX": (check_common_prefix, CheckType.error), + "CATEGORY_GENERIC": (check_category_generic, CheckType.error), +} + + +# The checks that operate on individual metrics: +# {NAME: (function, is_error)} +METRIC_CHECKS: Dict[ + str, Tuple[Callable[[metrics.Metric, dict], LintGenerator], CheckType] +] = { + "UNIT_IN_NAME": (check_unit_in_name, CheckType.error), + "BUG_NUMBER": (check_bug_number, CheckType.error), + "BASELINE_PING": (check_valid_in_baseline, CheckType.error), + "MISSPELLED_PING": (check_misspelled_pings, CheckType.error), + "TAGS_REQUIRED": (check_tags_required, CheckType.error), + "EXPIRATION_DATE_TOO_FAR": (check_expired_date, CheckType.warning), + "USER_LIFETIME_EXPIRATION": (check_user_lifetime_expiration, CheckType.warning), + "EXPIRED": (check_expired_metric, CheckType.warning), + "OLD_EVENT_API": (check_old_event_api, CheckType.warning), +} + + +# The checks that operate on individual pings: +# {NAME: (function, is_error)} +PING_CHECKS: Dict[ + str, Tuple[Callable[[pings.Ping, dict], LintGenerator], CheckType] +] = { + "BUG_NUMBER": (check_bug_number, CheckType.error), + "TAGS_REQUIRED": (check_tags_required, CheckType.error), + "REDUNDANT_PING": (check_redundant_ping, CheckType.error), +} + + +class GlinterNit: + def __init__(self, check_name: str, name: str, msg: str, check_type: CheckType): + self.check_name = check_name + self.name = name + self.msg = msg + self.check_type = check_type + + def format(self): + return ( + f"{self.check_type.name.upper()}: {self.check_name}: " + f"{self.name}: {self.msg}" + ) + + +def _lint_item_tags( + item_name: str, + item_type: str, + item_tag_names: List[str], + valid_tag_names: List[str], +) -> List[GlinterNit]: + invalid_tags = [tag for tag in item_tag_names if tag not in valid_tag_names] + return ( + [ + GlinterNit( + "INVALID_TAGS", + item_name, + f"Invalid tags specified in {item_type}: {', '.join(invalid_tags)}", + CheckType.error, + ) + ] + if len(invalid_tags) + else [] + ) + + +def _lint_pings( + category: Dict[str, Union[metrics.Metric, pings.Ping, tags.Tag]], + parser_config: Dict[str, Any], + valid_tag_names: List[str], +) -> List[GlinterNit]: + nits: List[GlinterNit] = [] + + for ping_name, ping in sorted(list(category.items())): + assert isinstance(ping, pings.Ping) + for check_name, (check_func, check_type) in PING_CHECKS.items(): + new_nits = list(check_func(ping, parser_config)) + if len(new_nits): + if check_name not in ping.no_lint: + nits.extend( + GlinterNit( + check_name, + ping_name, + msg, + check_type, + ) + for msg in new_nits + ) + nits.extend( + _lint_item_tags( + ping_name, + "ping", + ping.metadata.get("tags", []), + valid_tag_names, + ) + ) + return nits + + +def lint_metrics( + objs: metrics.ObjectTree, + parser_config: Optional[Dict[str, Any]] = None, + file=sys.stderr, +) -> List[GlinterNit]: + """ + Performs glinter checks on a set of metrics objects. + + :param objs: Tree of metric objects, as returns by `parser.parse_objects`. + :param file: The stream to write errors to. + :returns: List of nits. + """ + if parser_config is None: + parser_config = {} + + nits: List[GlinterNit] = [] + valid_tag_names = [tag for tag in objs.get("tags", [])] + for category_name, category in sorted(list(objs.items())): + if category_name == "pings": + nits.extend(_lint_pings(category, parser_config, valid_tag_names)) + continue + + if category_name == "tags": + # currently we have no linting for tags + continue + + # Make sure the category has only Metrics, not Pings or Tags + category_metrics = dict( + (name, metric) + for (name, metric) in category.items() + if isinstance(metric, metrics.Metric) + ) + + for cat_check_name, (cat_check_func, check_type) in CATEGORY_CHECKS.items(): + if any( + cat_check_name in metric.no_lint for metric in category_metrics.values() + ): + continue + nits.extend( + GlinterNit(cat_check_name, category_name, msg, check_type) + for msg in cat_check_func(category_name, category_metrics.values()) + ) + + for _metric_name, metric in sorted(list(category_metrics.items())): + for check_name, (check_func, check_type) in METRIC_CHECKS.items(): + new_nits = list(check_func(metric, parser_config)) + if len(new_nits): + if check_name not in metric.no_lint: + nits.extend( + GlinterNit( + check_name, + ".".join([metric.category, metric.name]), + msg, + check_type, + ) + for msg in new_nits + ) + + # also check that tags for metric are valid + nits.extend( + _lint_item_tags( + ".".join([metric.category, metric.name]), + "metric", + metric.metadata.get("tags", []), + valid_tag_names, + ) + ) + + if len(nits): + print("Sorry, Glean found some glinter nits:", file=file) + for nit in nits: + print(nit.format(), file=file) + print("", file=file) + print("Please fix the above nits to continue.", file=file) + print( + "To disable a check, add a `no_lint` parameter " + "with a list of check names to disable.\n" + "This parameter can appear with each individual metric, or at the " + "top-level to affect the entire file.", + file=file, + ) + + return nits + + +def lint_yaml_files( + input_filepaths: Iterable[Path], + file=sys.stderr, + parser_config: Optional[Dict[str, Any]] = None, +) -> List: + """Always empty.""" + return [] + + +def glinter( + input_filepaths: Iterable[Path], + parser_config: Optional[Dict[str, Any]] = None, + file=sys.stderr, +) -> int: + """ + Commandline helper for glinter. + + :param input_filepaths: List of Path objects to load metrics from. + :param parser_config: Parser configuration object, passed to + `parser.parse_objects`. + :param file: The stream to write the errors to. + :return: Non-zero if there were any glinter errors. + """ + if parser_config is None: + parser_config = {} + + errors = 0 + + objs = parser.parse_objects(input_filepaths, parser_config) + errors += util.report_validation_errors(objs) + + nits = lint_metrics(objs.value, parser_config=parser_config, file=file) + errors += len([nit for nit in nits if nit.check_type == CheckType.error]) + + if errors == 0: + print("✨ Your metrics are Glean! ✨", file=file) + return 0 + + print(f"❌ Found {errors} errors.") + + return 1 |