# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Code for parsing metrics.yaml files. """ import functools from pathlib import Path import textwrap from typing import Any, Dict, Generator, Iterable, Optional, Tuple, Union import jsonschema # type: ignore from jsonschema.exceptions import ValidationError # type: ignore from .metrics import Metric, ObjectTree from .pings import Ping, RESERVED_PING_NAMES from .tags import Tag from . import util from .util import DictWrapper ROOT_DIR = Path(__file__).parent SCHEMAS_DIR = ROOT_DIR / "schemas" METRICS_ID = "moz://mozilla.org/schemas/glean/metrics/2-0-0" PINGS_ID = "moz://mozilla.org/schemas/glean/pings/2-0-0" TAGS_ID = "moz://mozilla.org/schemas/glean/tags/1-0-0" def _update_validator(validator): """ Adds some custom validators to the jsonschema validator that produce nicer error messages. """ def required(validator, required, instance, schema): if not validator.is_type(instance, "object"): return missing_properties = set( property for property in required if property not in instance ) if len(missing_properties): missing_properties = sorted(list(missing_properties)) yield ValidationError( f"Missing required properties: {', '.join(missing_properties)}" ) validator.VALIDATORS["required"] = required def _load_file( filepath: Path, parser_config: Dict[str, Any] ) -> Generator[str, None, Tuple[Dict[str, util.JSONType], Optional[str]]]: """ Load a metrics.yaml or pings.yaml format file. If the `filepath` does not exist, raises `FileNotFoundError`, unless `parser_config["allow_missing_files"]` is `True`. """ try: content = util.load_yaml_or_json(filepath) except FileNotFoundError: if not parser_config.get("allow_missing_files", False): raise else: return {}, None except Exception as e: yield util.format_error(filepath, "", textwrap.fill(str(e))) return {}, None if content is None: yield util.format_error(filepath, "", f"'{filepath}' file can not be empty.") return {}, None if not isinstance(content, dict): return {}, None if content == {}: return {}, None schema_key = content.get("$schema") if not isinstance(schema_key, str): raise TypeError(f"Invalid schema key {schema_key}") filetype: Optional[str] = None try: filetype = schema_key.split("/")[-2] except IndexError: filetype = None if filetype not in ("metrics", "pings", "tags"): filetype = None for error in validate(content, filepath): content = {} yield error return content, filetype @functools.lru_cache(maxsize=1) def _load_schemas() -> Dict[str, Tuple[Any, Any]]: """ Load all of the known schemas from disk, and put them in a map based on the schema's $id. """ schemas = {} for schema_path in SCHEMAS_DIR.glob("*.yaml"): schema = util.load_yaml_or_json(schema_path) resolver = util.get_null_resolver(schema) validator_class = jsonschema.validators.validator_for(schema) _update_validator(validator_class) validator_class.check_schema(schema) validator = validator_class(schema, resolver=resolver) schemas[schema["$id"]] = (schema, validator) return schemas def _get_schema( schema_id: str, filepath: Union[str, Path] = "" ) -> Tuple[Any, Any]: """ Get the schema for the given schema $id. """ schemas = _load_schemas() if schema_id not in schemas: raise ValueError( util.format_error( filepath, "", f"$schema key must be one of {', '.join(schemas.keys())}", ) ) return schemas[schema_id] def _get_schema_for_content( content: Dict[str, util.JSONType], filepath: Union[str, Path] ) -> Tuple[Any, Any]: """ Get the appropriate schema for the given JSON content. """ schema_url = content.get("$schema") if not isinstance(schema_url, str): raise TypeError("Invalid $schema type {schema_url}") return _get_schema(schema_url, filepath) def validate( content: Dict[str, util.JSONType], filepath: Union[str, Path] = "" ) -> Generator[str, None, None]: """ Validate the given content against the appropriate schema. """ try: schema, validator = _get_schema_for_content(content, filepath) except ValueError as e: yield str(e) else: yield from ( util.format_error(filepath, "", util.pprint_validation_error(e)) for e in validator.iter_errors(content) ) def _instantiate_metrics( all_objects: ObjectTree, sources: Dict[Any, Path], content: Dict[str, util.JSONType], filepath: Path, config: Dict[str, Any], ) -> Generator[str, None, None]: """ Load a list of metrics.yaml files, convert the JSON information into Metric objects, and merge them into a single tree. """ global_no_lint = content.get("no_lint", []) global_tags = content.get("$tags", []) assert isinstance(global_tags, list) for category_key, category_val in sorted(content.items()): if category_key.startswith("$"): continue if category_key == "no_lint": continue if not config.get("allow_reserved") and category_key.split(".")[0] == "glean": yield util.format_error( filepath, f"For category '{category_key}'", "Categories beginning with 'glean' are reserved for " "Glean internal use.", ) continue all_objects.setdefault(category_key, DictWrapper()) if not isinstance(category_val, dict): raise TypeError(f"Invalid content for {category_key}") for metric_key, metric_val in sorted(category_val.items()): try: metric_obj = Metric.make_metric( category_key, metric_key, metric_val, validated=True, config=config ) except Exception as e: yield util.format_error( filepath, f"On instance {category_key}.{metric_key}", str(e), metric_val.defined_in["line"], ) metric_obj = None else: if ( not config.get("allow_reserved") and "all-pings" in metric_obj.send_in_pings ): yield util.format_error( filepath, f"On instance {category_key}.{metric_key}", 'Only internal metrics may specify "all-pings" ' 'in "send_in_pings"', metric_val.defined_in["line"], ) metric_obj = None if metric_obj is not None: metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint)) if len(global_tags): metric_obj.metadata["tags"] = sorted( set(metric_obj.metadata.get("tags", []) + global_tags) ) if isinstance(filepath, Path): metric_obj.defined_in["filepath"] = str(filepath) already_seen = sources.get((category_key, metric_key)) if already_seen is not None: # We've seen this metric name already yield util.format_error( filepath, "", ( f"Duplicate metric name '{category_key}.{metric_key}' " f"already defined in '{already_seen}'" ), metric_obj.defined_in["line"], ) else: all_objects[category_key][metric_key] = metric_obj sources[(category_key, metric_key)] = filepath def _instantiate_pings( all_objects: ObjectTree, sources: Dict[Any, Path], content: Dict[str, util.JSONType], filepath: Path, config: Dict[str, Any], ) -> Generator[str, None, None]: """ Load a list of pings.yaml files, convert the JSON information into Ping objects. """ global_no_lint = content.get("no_lint", []) assert isinstance(global_no_lint, list) for ping_key, ping_val in sorted(content.items()): if ping_key.startswith("$"): continue if ping_key == "no_lint": continue if not config.get("allow_reserved"): if ping_key in RESERVED_PING_NAMES: yield util.format_error( filepath, f"For ping '{ping_key}'", f"Ping uses a reserved name ({RESERVED_PING_NAMES})", ) continue if not isinstance(ping_val, dict): raise TypeError(f"Invalid content for ping {ping_key}") ping_val["name"] = ping_key try: ping_obj = Ping( defined_in=getattr(ping_val, "defined_in", None), _validated=True, **ping_val, ) except Exception as e: yield util.format_error(filepath, f"On instance '{ping_key}'", str(e)) continue if ping_obj is not None: ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint)) if isinstance(filepath, Path) and ping_obj.defined_in is not None: ping_obj.defined_in["filepath"] = str(filepath) already_seen = sources.get(ping_key) if already_seen is not None: # We've seen this ping name already yield util.format_error( filepath, "", f"Duplicate ping name '{ping_key}' " f"already defined in '{already_seen}'", ) else: all_objects.setdefault("pings", {})[ping_key] = ping_obj sources[ping_key] = filepath def _instantiate_tags( all_objects: ObjectTree, sources: Dict[Any, Path], content: Dict[str, util.JSONType], filepath: Path, config: Dict[str, Any], ) -> Generator[str, None, None]: """ Load a list of tags.yaml files, convert the JSON information into Tag objects. """ global_no_lint = content.get("no_lint", []) assert isinstance(global_no_lint, list) for tag_key, tag_val in sorted(content.items()): if tag_key.startswith("$"): continue if tag_key == "no_lint": continue if not isinstance(tag_val, dict): raise TypeError(f"Invalid content for tag {tag_key}") tag_val["name"] = tag_key try: tag_obj = Tag( defined_in=getattr(tag_val, "defined_in", None), _validated=True, **tag_val, ) except Exception as e: yield util.format_error(filepath, f"On instance '{tag_key}'", str(e)) continue if tag_obj is not None: tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint)) if isinstance(filepath, Path) and tag_obj.defined_in is not None: tag_obj.defined_in["filepath"] = str(filepath) already_seen = sources.get(tag_key) if already_seen is not None: # We've seen this tag name already yield util.format_error( filepath, "", f"Duplicate tag name '{tag_key}' " f"already defined in '{already_seen}'", ) else: all_objects.setdefault("tags", {})[tag_key] = tag_obj sources[tag_key] = filepath def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree: """ Preprocess the object tree to better set defaults. """ for category in objs.values(): for obj in category.values(): if not isinstance(obj, Metric): continue if not config.get("do_not_disable_expired", False) and hasattr( obj, "is_disabled" ): obj.disabled = obj.is_disabled() if hasattr(obj, "send_in_pings"): if "default" in obj.send_in_pings: obj.send_in_pings = obj.default_store_names + [ x for x in obj.send_in_pings if x != "default" ] obj.send_in_pings = sorted(list(set(obj.send_in_pings))) return objs @util.keep_value def parse_objects( filepaths: Iterable[Path], config: Optional[Dict[str, Any]] = None ) -> Generator[str, None, ObjectTree]: """ Parse one or more metrics.yaml and/or pings.yaml files, returning a tree of `metrics.Metric`, `pings.Ping`, and `tags.Tag` instances. The result is a generator over any errors. If there are no errors, the actual metrics can be obtained from `result.value`. For example:: result = metrics.parse_metrics(filepaths) for err in result: print(err) all_metrics = result.value The result value is a dictionary of category names to categories, where each category is a dictionary from metric name to `metrics.Metric` instances. There are also the special categories `pings` and `tags` containing all of the `pings.Ping` and `tags.Tag` instances, respectively. :param filepaths: list of Path objects to metrics.yaml, pings.yaml, and/or tags.yaml files :param config: A dictionary of options that change parsing behavior. Supported keys are: - `allow_reserved`: Allow values reserved for internal Glean use. - `do_not_disable_expired`: Don't mark expired metrics as disabled. This is useful when you want to retain the original "disabled" value from the `metrics.yaml`, rather than having it overridden when the metric expires. - `allow_missing_files`: Do not raise a `FileNotFoundError` if any of the input `filepaths` do not exist. """ if config is None: config = {} all_objects: ObjectTree = DictWrapper() sources: Dict[Any, Path] = {} filepaths = util.ensure_list(filepaths) for filepath in filepaths: content, filetype = yield from _load_file(filepath, config) if filetype == "metrics": yield from _instantiate_metrics( all_objects, sources, content, filepath, config ) elif filetype == "pings": yield from _instantiate_pings( all_objects, sources, content, filepath, config ) elif filetype == "tags": yield from _instantiate_tags( all_objects, sources, content, filepath, config ) return _preprocess_objects(all_objects, config)