summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
blob: 6cf2777401a2436af37ac332594cf6e6512224e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""Schema validation of ansible-core's ansible_builtin_runtime.yml and collection's meta/runtime.yml"""
from __future__ import annotations

import datetime
import os
import re
import sys

from functools import partial

import yaml

from voluptuous import All, Any, MultipleInvalid, PREVENT_EXTRA
from voluptuous import Required, Schema, Invalid
from voluptuous.humanize import humanize_error

from ansible.module_utils.compat.version import StrictVersion, LooseVersion
from ansible.module_utils.six import string_types
from ansible.utils.version import SemanticVersion


def isodate(value, check_deprecation_date=False, is_tombstone=False):
    """Validate a datetime.date or ISO 8601 date string."""
    # datetime.date objects come from YAML dates, these are ok
    if isinstance(value, datetime.date):
        removal_date = value
    else:
        # make sure we have a string
        msg = 'Expected ISO 8601 date string (YYYY-MM-DD), or YAML date'
        if not isinstance(value, string_types):
            raise Invalid(msg)
        # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions,
        # we have to do things manually.
        if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', value):
            raise Invalid(msg)
        try:
            removal_date = datetime.datetime.strptime(value, '%Y-%m-%d').date()
        except ValueError:
            raise Invalid(msg)
    # Make sure date is correct
    today = datetime.date.today()
    if is_tombstone:
        # For a tombstone, the removal date must be in the past
        if today < removal_date:
            raise Invalid(
                'The tombstone removal_date (%s) must not be after today (%s)' % (removal_date, today))
    else:
        # For a deprecation, the removal date must be in the future. Only test this if
        # check_deprecation_date is truish, to avoid checks to suddenly start to fail.
        if check_deprecation_date and today > removal_date:
            raise Invalid(
                'The deprecation removal_date (%s) must be after today (%s)' % (removal_date, today))
    return value


def removal_version(value, is_ansible, current_version=None, is_tombstone=False):
    """Validate a removal version string."""
    msg = (
        'Removal version must be a string' if is_ansible else
        'Removal version must be a semantic version (https://semver.org/)'
    )
    if not isinstance(value, string_types):
        raise Invalid(msg)
    try:
        if is_ansible:
            version = StrictVersion()
            version.parse(value)
            version = LooseVersion(value)  # We're storing Ansible's version as a LooseVersion
        else:
            version = SemanticVersion()
            version.parse(value)
            if version.major != 0 and (version.minor != 0 or version.patch != 0):
                raise Invalid('removal_version (%r) must be a major release, not a minor or patch release '
                              '(see specification at https://semver.org/)' % (value, ))
        if current_version is not None:
            if is_tombstone:
                # For a tombstone, the removal version must not be in the future
                if version > current_version:
                    raise Invalid('The tombstone removal_version (%r) must not be after the '
                                  'current version (%s)' % (value, current_version))
            else:
                # For a deprecation, the removal version must be in the future
                if version <= current_version:
                    raise Invalid('The deprecation removal_version (%r) must be after the '
                                  'current version (%s)' % (value, current_version))
    except ValueError:
        raise Invalid(msg)
    return value


def any_value(value):
    """Accepts anything."""
    return value


def get_ansible_version():
    """Return current ansible-core version"""
    from ansible.release import __version__

    return LooseVersion('.'.join(__version__.split('.')[:3]))


def get_collection_version():
    """Return current collection version, or None if it is not available"""
    import importlib.util

    collection_detail_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'tools', 'collection_detail.py')
    collection_detail_spec = importlib.util.spec_from_file_location('collection_detail', collection_detail_path)
    collection_detail = importlib.util.module_from_spec(collection_detail_spec)
    sys.modules['collection_detail'] = collection_detail
    collection_detail_spec.loader.exec_module(collection_detail)

    # noinspection PyBroadException
    try:
        result = collection_detail.read_manifest_json('.') or collection_detail.read_galaxy_yml('.')
        return SemanticVersion(result['version'])
    except Exception:  # pylint: disable=broad-except
        # We do not care why it fails, in case we cannot get the version
        # just return None to indicate "we don't know".
        return None


def validate_metadata_file(path, is_ansible, check_deprecation_dates=False):
    """Validate explicit runtime metadata file"""
    try:
        with open(path, 'r', encoding='utf-8') as f_path:
            routing = yaml.safe_load(f_path)
    except yaml.error.MarkedYAMLError as ex:
        print('%s:%d:%d: YAML load failed: %s' % (path, ex.context_mark.line +
                                                  1, ex.context_mark.column + 1, re.sub(r'\s+', ' ', str(ex))))
        return
    except Exception as ex:  # pylint: disable=broad-except
        print('%s:%d:%d: YAML load failed: %s' %
              (path, 0, 0, re.sub(r'\s+', ' ', str(ex))))
        return

    if is_ansible:
        current_version = get_ansible_version()
    else:
        current_version = get_collection_version()

    # Updates to schema MUST also be reflected in the documentation
    # ~https://docs.ansible.com/ansible-core/devel/dev_guide/developing_collections.html

    # plugin_routing schema

    avoid_additional_data = Schema(
        Any(
            {
                Required('removal_version'): any_value,
                'warning_text': any_value,
            },
            {
                Required('removal_date'): any_value,
                'warning_text': any_value,
            }
        ),
        extra=PREVENT_EXTRA
    )

    deprecation_schema = All(
        # The first schema validates the input, and the second makes sure no extra keys are specified
        Schema(
            {
                'removal_version': partial(removal_version, is_ansible=is_ansible,
                                           current_version=current_version),
                'removal_date': partial(isodate, check_deprecation_date=check_deprecation_dates),
                'warning_text': Any(*string_types),
            }
        ),
        avoid_additional_data
    )

    tombstoning_schema = All(
        # The first schema validates the input, and the second makes sure no extra keys are specified
        Schema(
            {
                'removal_version': partial(removal_version, is_ansible=is_ansible,
                                           current_version=current_version, is_tombstone=True),
                'removal_date': partial(isodate, is_tombstone=True),
                'warning_text': Any(*string_types),
            }
        ),
        avoid_additional_data
    )

    plugin_routing_schema = Any(
        Schema({
            ('deprecation'): Any(deprecation_schema),
            ('tombstone'): Any(tombstoning_schema),
            ('redirect'): Any(*string_types),
        }, extra=PREVENT_EXTRA),
    )

    list_dict_plugin_routing_schema = [{str_type: plugin_routing_schema}
                                       for str_type in string_types]

    plugin_schema = Schema({
        ('action'): Any(None, *list_dict_plugin_routing_schema),
        ('become'): Any(None, *list_dict_plugin_routing_schema),
        ('cache'): Any(None, *list_dict_plugin_routing_schema),
        ('callback'): Any(None, *list_dict_plugin_routing_schema),
        ('cliconf'): Any(None, *list_dict_plugin_routing_schema),
        ('connection'): Any(None, *list_dict_plugin_routing_schema),
        ('doc_fragments'): Any(None, *list_dict_plugin_routing_schema),
        ('filter'): Any(None, *list_dict_plugin_routing_schema),
        ('httpapi'): Any(None, *list_dict_plugin_routing_schema),
        ('inventory'): Any(None, *list_dict_plugin_routing_schema),
        ('lookup'): Any(None, *list_dict_plugin_routing_schema),
        ('module_utils'): Any(None, *list_dict_plugin_routing_schema),
        ('modules'): Any(None, *list_dict_plugin_routing_schema),
        ('netconf'): Any(None, *list_dict_plugin_routing_schema),
        ('shell'): Any(None, *list_dict_plugin_routing_schema),
        ('strategy'): Any(None, *list_dict_plugin_routing_schema),
        ('terminal'): Any(None, *list_dict_plugin_routing_schema),
        ('test'): Any(None, *list_dict_plugin_routing_schema),
        ('vars'): Any(None, *list_dict_plugin_routing_schema),
    }, extra=PREVENT_EXTRA)

    # import_redirection schema

    import_redirection_schema = Any(
        Schema({
            ('redirect'): Any(*string_types),
            # import_redirect doesn't currently support deprecation
        }, extra=PREVENT_EXTRA)
    )

    list_dict_import_redirection_schema = [{str_type: import_redirection_schema}
                                           for str_type in string_types]

    # top level schema

    schema = Schema({
        # All of these are optional
        ('plugin_routing'): Any(plugin_schema),
        ('import_redirection'): Any(None, *list_dict_import_redirection_schema),
        # requires_ansible: In the future we should validate this with SpecifierSet
        ('requires_ansible'): Any(*string_types),
        ('action_groups'): dict,
    }, extra=PREVENT_EXTRA)

    # Ensure schema is valid

    try:
        schema(routing)
    except MultipleInvalid as ex:
        for error in ex.errors:
            # No way to get line/column numbers
            print('%s:%d:%d: %s' % (path, 0, 0, humanize_error(routing, error)))


def main():
    """Main entry point."""
    paths = sys.argv[1:] or sys.stdin.read().splitlines()

    collection_legacy_file = 'meta/routing.yml'
    collection_runtime_file = 'meta/runtime.yml'

    # This is currently disabled, because if it is enabled this test can start failing
    # at a random date. For this to be properly activated, we (a) need to be able to return
    # codes for this test, and (b) make this error optional.
    check_deprecation_dates = False

    for path in paths:
        if path == collection_legacy_file:
            print('%s:%d:%d: %s' % (path, 0, 0, ("Should be called '%s'" % collection_runtime_file)))
            continue

        validate_metadata_file(
            path,
            is_ansible=path not in (collection_legacy_file, collection_runtime_file),
            check_deprecation_dates=check_deprecation_dates)


if __name__ == '__main__':
    main()