summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py')
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py277
1 files changed, 277 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
new file mode 100644
index 0000000..6cf2777
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
@@ -0,0 +1,277 @@
+"""Schema validation of ansible-core's ansible_builtin_runtime.yml and collection's meta/runtime.yml"""
+from __future__ import annotations
+
+import datetime
+import os
+import re
+import sys
+
+from functools import partial
+
+import yaml
+
+from voluptuous import All, Any, MultipleInvalid, PREVENT_EXTRA
+from voluptuous import Required, Schema, Invalid
+from voluptuous.humanize import humanize_error
+
+from ansible.module_utils.compat.version import StrictVersion, LooseVersion
+from ansible.module_utils.six import string_types
+from ansible.utils.version import SemanticVersion
+
+
+def isodate(value, check_deprecation_date=False, is_tombstone=False):
+ """Validate a datetime.date or ISO 8601 date string."""
+ # datetime.date objects come from YAML dates, these are ok
+ if isinstance(value, datetime.date):
+ removal_date = value
+ else:
+ # make sure we have a string
+ msg = 'Expected ISO 8601 date string (YYYY-MM-DD), or YAML date'
+ if not isinstance(value, string_types):
+ raise Invalid(msg)
+ # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions,
+ # we have to do things manually.
+ if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', value):
+ raise Invalid(msg)
+ try:
+ removal_date = datetime.datetime.strptime(value, '%Y-%m-%d').date()
+ except ValueError:
+ raise Invalid(msg)
+ # Make sure date is correct
+ today = datetime.date.today()
+ if is_tombstone:
+ # For a tombstone, the removal date must be in the past
+ if today < removal_date:
+ raise Invalid(
+ 'The tombstone removal_date (%s) must not be after today (%s)' % (removal_date, today))
+ else:
+ # For a deprecation, the removal date must be in the future. Only test this if
+ # check_deprecation_date is truish, to avoid checks to suddenly start to fail.
+ if check_deprecation_date and today > removal_date:
+ raise Invalid(
+ 'The deprecation removal_date (%s) must be after today (%s)' % (removal_date, today))
+ return value
+
+
+def removal_version(value, is_ansible, current_version=None, is_tombstone=False):
+ """Validate a removal version string."""
+ msg = (
+ 'Removal version must be a string' if is_ansible else
+ 'Removal version must be a semantic version (https://semver.org/)'
+ )
+ if not isinstance(value, string_types):
+ raise Invalid(msg)
+ try:
+ if is_ansible:
+ version = StrictVersion()
+ version.parse(value)
+ version = LooseVersion(value) # We're storing Ansible's version as a LooseVersion
+ else:
+ version = SemanticVersion()
+ version.parse(value)
+ if version.major != 0 and (version.minor != 0 or version.patch != 0):
+ raise Invalid('removal_version (%r) must be a major release, not a minor or patch release '
+ '(see specification at https://semver.org/)' % (value, ))
+ if current_version is not None:
+ if is_tombstone:
+ # For a tombstone, the removal version must not be in the future
+ if version > current_version:
+ raise Invalid('The tombstone removal_version (%r) must not be after the '
+ 'current version (%s)' % (value, current_version))
+ else:
+ # For a deprecation, the removal version must be in the future
+ if version <= current_version:
+ raise Invalid('The deprecation removal_version (%r) must be after the '
+ 'current version (%s)' % (value, current_version))
+ except ValueError:
+ raise Invalid(msg)
+ return value
+
+
+def any_value(value):
+ """Accepts anything."""
+ return value
+
+
+def get_ansible_version():
+ """Return current ansible-core version"""
+ from ansible.release import __version__
+
+ return LooseVersion('.'.join(__version__.split('.')[:3]))
+
+
+def get_collection_version():
+ """Return current collection version, or None if it is not available"""
+ import importlib.util
+
+ collection_detail_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'tools', 'collection_detail.py')
+ collection_detail_spec = importlib.util.spec_from_file_location('collection_detail', collection_detail_path)
+ collection_detail = importlib.util.module_from_spec(collection_detail_spec)
+ sys.modules['collection_detail'] = collection_detail
+ collection_detail_spec.loader.exec_module(collection_detail)
+
+ # noinspection PyBroadException
+ try:
+ result = collection_detail.read_manifest_json('.') or collection_detail.read_galaxy_yml('.')
+ return SemanticVersion(result['version'])
+ except Exception: # pylint: disable=broad-except
+ # We do not care why it fails, in case we cannot get the version
+ # just return None to indicate "we don't know".
+ return None
+
+
+def validate_metadata_file(path, is_ansible, check_deprecation_dates=False):
+ """Validate explicit runtime metadata file"""
+ try:
+ with open(path, 'r', encoding='utf-8') as f_path:
+ routing = yaml.safe_load(f_path)
+ except yaml.error.MarkedYAMLError as ex:
+ print('%s:%d:%d: YAML load failed: %s' % (path, ex.context_mark.line +
+ 1, ex.context_mark.column + 1, re.sub(r'\s+', ' ', str(ex))))
+ return
+ except Exception as ex: # pylint: disable=broad-except
+ print('%s:%d:%d: YAML load failed: %s' %
+ (path, 0, 0, re.sub(r'\s+', ' ', str(ex))))
+ return
+
+ if is_ansible:
+ current_version = get_ansible_version()
+ else:
+ current_version = get_collection_version()
+
+ # Updates to schema MUST also be reflected in the documentation
+ # ~https://docs.ansible.com/ansible-core/devel/dev_guide/developing_collections.html
+
+ # plugin_routing schema
+
+ avoid_additional_data = Schema(
+ Any(
+ {
+ Required('removal_version'): any_value,
+ 'warning_text': any_value,
+ },
+ {
+ Required('removal_date'): any_value,
+ 'warning_text': any_value,
+ }
+ ),
+ extra=PREVENT_EXTRA
+ )
+
+ deprecation_schema = All(
+ # The first schema validates the input, and the second makes sure no extra keys are specified
+ Schema(
+ {
+ 'removal_version': partial(removal_version, is_ansible=is_ansible,
+ current_version=current_version),
+ 'removal_date': partial(isodate, check_deprecation_date=check_deprecation_dates),
+ 'warning_text': Any(*string_types),
+ }
+ ),
+ avoid_additional_data
+ )
+
+ tombstoning_schema = All(
+ # The first schema validates the input, and the second makes sure no extra keys are specified
+ Schema(
+ {
+ 'removal_version': partial(removal_version, is_ansible=is_ansible,
+ current_version=current_version, is_tombstone=True),
+ 'removal_date': partial(isodate, is_tombstone=True),
+ 'warning_text': Any(*string_types),
+ }
+ ),
+ avoid_additional_data
+ )
+
+ plugin_routing_schema = Any(
+ Schema({
+ ('deprecation'): Any(deprecation_schema),
+ ('tombstone'): Any(tombstoning_schema),
+ ('redirect'): Any(*string_types),
+ }, extra=PREVENT_EXTRA),
+ )
+
+ list_dict_plugin_routing_schema = [{str_type: plugin_routing_schema}
+ for str_type in string_types]
+
+ plugin_schema = Schema({
+ ('action'): Any(None, *list_dict_plugin_routing_schema),
+ ('become'): Any(None, *list_dict_plugin_routing_schema),
+ ('cache'): Any(None, *list_dict_plugin_routing_schema),
+ ('callback'): Any(None, *list_dict_plugin_routing_schema),
+ ('cliconf'): Any(None, *list_dict_plugin_routing_schema),
+ ('connection'): Any(None, *list_dict_plugin_routing_schema),
+ ('doc_fragments'): Any(None, *list_dict_plugin_routing_schema),
+ ('filter'): Any(None, *list_dict_plugin_routing_schema),
+ ('httpapi'): Any(None, *list_dict_plugin_routing_schema),
+ ('inventory'): Any(None, *list_dict_plugin_routing_schema),
+ ('lookup'): Any(None, *list_dict_plugin_routing_schema),
+ ('module_utils'): Any(None, *list_dict_plugin_routing_schema),
+ ('modules'): Any(None, *list_dict_plugin_routing_schema),
+ ('netconf'): Any(None, *list_dict_plugin_routing_schema),
+ ('shell'): Any(None, *list_dict_plugin_routing_schema),
+ ('strategy'): Any(None, *list_dict_plugin_routing_schema),
+ ('terminal'): Any(None, *list_dict_plugin_routing_schema),
+ ('test'): Any(None, *list_dict_plugin_routing_schema),
+ ('vars'): Any(None, *list_dict_plugin_routing_schema),
+ }, extra=PREVENT_EXTRA)
+
+ # import_redirection schema
+
+ import_redirection_schema = Any(
+ Schema({
+ ('redirect'): Any(*string_types),
+ # import_redirect doesn't currently support deprecation
+ }, extra=PREVENT_EXTRA)
+ )
+
+ list_dict_import_redirection_schema = [{str_type: import_redirection_schema}
+ for str_type in string_types]
+
+ # top level schema
+
+ schema = Schema({
+ # All of these are optional
+ ('plugin_routing'): Any(plugin_schema),
+ ('import_redirection'): Any(None, *list_dict_import_redirection_schema),
+ # requires_ansible: In the future we should validate this with SpecifierSet
+ ('requires_ansible'): Any(*string_types),
+ ('action_groups'): dict,
+ }, extra=PREVENT_EXTRA)
+
+ # Ensure schema is valid
+
+ try:
+ schema(routing)
+ except MultipleInvalid as ex:
+ for error in ex.errors:
+ # No way to get line/column numbers
+ print('%s:%d:%d: %s' % (path, 0, 0, humanize_error(routing, error)))
+
+
+def main():
+ """Main entry point."""
+ paths = sys.argv[1:] or sys.stdin.read().splitlines()
+
+ collection_legacy_file = 'meta/routing.yml'
+ collection_runtime_file = 'meta/runtime.yml'
+
+ # This is currently disabled, because if it is enabled this test can start failing
+ # at a random date. For this to be properly activated, we (a) need to be able to return
+ # codes for this test, and (b) make this error optional.
+ check_deprecation_dates = False
+
+ for path in paths:
+ if path == collection_legacy_file:
+ print('%s:%d:%d: %s' % (path, 0, 0, ("Should be called '%s'" % collection_runtime_file)))
+ continue
+
+ validate_metadata_file(
+ path,
+ is_ansible=path not in (collection_legacy_file, collection_runtime_file),
+ check_deprecation_dates=check_deprecation_dates)
+
+
+if __name__ == '__main__':
+ main()