"""Schema validation of ansible-core's ansible_builtin_runtime.yml and collection's meta/runtime.yml""" from __future__ import annotations import datetime import os import re import sys from functools import partial import yaml from voluptuous import All, Any, MultipleInvalid, PREVENT_EXTRA from voluptuous import Required, Schema, Invalid from voluptuous.humanize import humanize_error from ansible.module_utils.compat.version import StrictVersion, LooseVersion from ansible.module_utils.six import string_types from ansible.utils.version import SemanticVersion def isodate(value, check_deprecation_date=False, is_tombstone=False): """Validate a datetime.date or ISO 8601 date string.""" # datetime.date objects come from YAML dates, these are ok if isinstance(value, datetime.date): removal_date = value else: # make sure we have a string msg = 'Expected ISO 8601 date string (YYYY-MM-DD), or YAML date' if not isinstance(value, string_types): raise Invalid(msg) # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions, # we have to do things manually. if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', value): raise Invalid(msg) try: removal_date = datetime.datetime.strptime(value, '%Y-%m-%d').date() except ValueError: raise Invalid(msg) # Make sure date is correct today = datetime.date.today() if is_tombstone: # For a tombstone, the removal date must be in the past if today < removal_date: raise Invalid( 'The tombstone removal_date (%s) must not be after today (%s)' % (removal_date, today)) else: # For a deprecation, the removal date must be in the future. Only test this if # check_deprecation_date is truish, to avoid checks to suddenly start to fail. if check_deprecation_date and today > removal_date: raise Invalid( 'The deprecation removal_date (%s) must be after today (%s)' % (removal_date, today)) return value def removal_version(value, is_ansible, current_version=None, is_tombstone=False): """Validate a removal version string.""" msg = ( 'Removal version must be a string' if is_ansible else 'Removal version must be a semantic version (https://semver.org/)' ) if not isinstance(value, string_types): raise Invalid(msg) try: if is_ansible: version = StrictVersion() version.parse(value) version = LooseVersion(value) # We're storing Ansible's version as a LooseVersion else: version = SemanticVersion() version.parse(value) if version.major != 0 and (version.minor != 0 or version.patch != 0): raise Invalid('removal_version (%r) must be a major release, not a minor or patch release ' '(see specification at https://semver.org/)' % (value, )) if current_version is not None: if is_tombstone: # For a tombstone, the removal version must not be in the future if version > current_version: raise Invalid('The tombstone removal_version (%r) must not be after the ' 'current version (%s)' % (value, current_version)) else: # For a deprecation, the removal version must be in the future if version <= current_version: raise Invalid('The deprecation removal_version (%r) must be after the ' 'current version (%s)' % (value, current_version)) except ValueError: raise Invalid(msg) return value def any_value(value): """Accepts anything.""" return value def get_ansible_version(): """Return current ansible-core version""" from ansible.release import __version__ return LooseVersion('.'.join(__version__.split('.')[:3])) def get_collection_version(): """Return current collection version, or None if it is not available""" import importlib.util collection_detail_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'tools', 'collection_detail.py') collection_detail_spec = importlib.util.spec_from_file_location('collection_detail', collection_detail_path) collection_detail = importlib.util.module_from_spec(collection_detail_spec) sys.modules['collection_detail'] = collection_detail collection_detail_spec.loader.exec_module(collection_detail) # noinspection PyBroadException try: result = collection_detail.read_manifest_json('.') or collection_detail.read_galaxy_yml('.') return SemanticVersion(result['version']) except Exception: # pylint: disable=broad-except # We do not care why it fails, in case we cannot get the version # just return None to indicate "we don't know". return None def validate_metadata_file(path, is_ansible, check_deprecation_dates=False): """Validate explicit runtime metadata file""" try: with open(path, 'r', encoding='utf-8') as f_path: routing = yaml.safe_load(f_path) except yaml.error.MarkedYAMLError as ex: print('%s:%d:%d: YAML load failed: %s' % (path, ex.context_mark.line + 1, ex.context_mark.column + 1, re.sub(r'\s+', ' ', str(ex)))) return except Exception as ex: # pylint: disable=broad-except print('%s:%d:%d: YAML load failed: %s' % (path, 0, 0, re.sub(r'\s+', ' ', str(ex)))) return if is_ansible: current_version = get_ansible_version() else: current_version = get_collection_version() # Updates to schema MUST also be reflected in the documentation # ~https://docs.ansible.com/ansible-core/devel/dev_guide/developing_collections.html # plugin_routing schema avoid_additional_data = Schema( Any( { Required('removal_version'): any_value, 'warning_text': any_value, }, { Required('removal_date'): any_value, 'warning_text': any_value, } ), extra=PREVENT_EXTRA ) deprecation_schema = All( # The first schema validates the input, and the second makes sure no extra keys are specified Schema( { 'removal_version': partial(removal_version, is_ansible=is_ansible, current_version=current_version), 'removal_date': partial(isodate, check_deprecation_date=check_deprecation_dates), 'warning_text': Any(*string_types), } ), avoid_additional_data ) tombstoning_schema = All( # The first schema validates the input, and the second makes sure no extra keys are specified Schema( { 'removal_version': partial(removal_version, is_ansible=is_ansible, current_version=current_version, is_tombstone=True), 'removal_date': partial(isodate, is_tombstone=True), 'warning_text': Any(*string_types), } ), avoid_additional_data ) plugin_routing_schema = Any( Schema({ ('deprecation'): Any(deprecation_schema), ('tombstone'): Any(tombstoning_schema), ('redirect'): Any(*string_types), }, extra=PREVENT_EXTRA), ) list_dict_plugin_routing_schema = [{str_type: plugin_routing_schema} for str_type in string_types] plugin_schema = Schema({ ('action'): Any(None, *list_dict_plugin_routing_schema), ('become'): Any(None, *list_dict_plugin_routing_schema), ('cache'): Any(None, *list_dict_plugin_routing_schema), ('callback'): Any(None, *list_dict_plugin_routing_schema), ('cliconf'): Any(None, *list_dict_plugin_routing_schema), ('connection'): Any(None, *list_dict_plugin_routing_schema), ('doc_fragments'): Any(None, *list_dict_plugin_routing_schema), ('filter'): Any(None, *list_dict_plugin_routing_schema), ('httpapi'): Any(None, *list_dict_plugin_routing_schema), ('inventory'): Any(None, *list_dict_plugin_routing_schema), ('lookup'): Any(None, *list_dict_plugin_routing_schema), ('module_utils'): Any(None, *list_dict_plugin_routing_schema), ('modules'): Any(None, *list_dict_plugin_routing_schema), ('netconf'): Any(None, *list_dict_plugin_routing_schema), ('shell'): Any(None, *list_dict_plugin_routing_schema), ('strategy'): Any(None, *list_dict_plugin_routing_schema), ('terminal'): Any(None, *list_dict_plugin_routing_schema), ('test'): Any(None, *list_dict_plugin_routing_schema), ('vars'): Any(None, *list_dict_plugin_routing_schema), }, extra=PREVENT_EXTRA) # import_redirection schema import_redirection_schema = Any( Schema({ ('redirect'): Any(*string_types), # import_redirect doesn't currently support deprecation }, extra=PREVENT_EXTRA) ) list_dict_import_redirection_schema = [{str_type: import_redirection_schema} for str_type in string_types] # top level schema schema = Schema({ # All of these are optional ('plugin_routing'): Any(plugin_schema), ('import_redirection'): Any(None, *list_dict_import_redirection_schema), # requires_ansible: In the future we should validate this with SpecifierSet ('requires_ansible'): Any(*string_types), ('action_groups'): dict, }, extra=PREVENT_EXTRA) # Ensure schema is valid try: schema(routing) except MultipleInvalid as ex: for error in ex.errors: # No way to get line/column numbers print('%s:%d:%d: %s' % (path, 0, 0, humanize_error(routing, error))) def main(): """Main entry point.""" paths = sys.argv[1:] or sys.stdin.read().splitlines() collection_legacy_file = 'meta/routing.yml' collection_runtime_file = 'meta/runtime.yml' # This is currently disabled, because if it is enabled this test can start failing # at a random date. For this to be properly activated, we (a) need to be able to return # codes for this test, and (b) make this error optional. check_deprecation_dates = False for path in paths: if path == collection_legacy_file: print('%s:%d:%d: %s' % (path, 0, 0, ("Should be called '%s'" % collection_runtime_file))) continue validate_metadata_file( path, is_ansible=path not in (collection_legacy_file, collection_runtime_file), check_deprecation_dates=check_deprecation_dates) if __name__ == '__main__': main()