summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_util/controller
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_util/controller')
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.json13
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.py66
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.json8
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.py60
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/changelog/sphinx.py4
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.json14
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.py16
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.py46
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.json4
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.py18
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.py44
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.json10
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.py24
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.py28
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.json5
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.py83
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.json10
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.json5
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.py28
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.json7
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.json11
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py277
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.json4
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.py124
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.json5
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.py32
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.json10
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.json6
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.py21
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/integration-aliases/yaml_to_json.py14
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/mypy/ansible-core.ini119
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/mypy/ansible-test.ini24
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/mypy/modules.ini98
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pep8/current-ignore.txt4
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pslint/pslint.ps137
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pslint/settings.psd152
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test-target.cfg57
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test.cfg63
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/config/code-smell.cfg57
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/config/collection.cfg147
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/config/default.cfg146
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/plugins/deprecated.py263
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/plugins/string_format.py85
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py223
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/shellcheck/exclude.txt3
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate.py6
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/__init__.py18
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py2520
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/module_args.py176
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/ps_argspec.ps1121
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/schema.py899
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/utils.py222
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/yamllint/config/default.yml19
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/yamllint/config/modules.yml19
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/yamllint/config/plugins.yml19
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py246
-rw-r--r--test/lib/ansible_test/_util/controller/tools/collection_detail.py94
-rw-r--r--test/lib/ansible_test/_util/controller/tools/coverage_stub.ps140
-rw-r--r--test/lib/ansible_test/_util/controller/tools/sslcheck.py22
-rw-r--r--test/lib/ansible_test/_util/controller/tools/yaml_to_json.py27
76 files changed, 7047 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.json
new file mode 100644
index 0000000..12bbe0d
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.json
@@ -0,0 +1,13 @@
+{
+ "all_targets": true,
+ "prefixes": [
+ "lib/ansible/modules/",
+ "lib/ansible/plugins/action/",
+ "plugins/modules/",
+ "plugins/action/"
+ ],
+ "extensions": [
+ ".py"
+ ],
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.py
new file mode 100644
index 0000000..a319d1a
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/action-plugin-docs.py
@@ -0,0 +1,66 @@
+"""Test to verify action plugins have an associated module to provide documentation."""
+from __future__ import annotations
+
+import os
+import sys
+
+
+def main():
+ """Main entry point."""
+ paths = sys.argv[1:] or sys.stdin.read().splitlines()
+
+ module_names = set()
+
+ module_prefixes = {
+ 'lib/ansible/modules/': True,
+ 'plugins/modules/': False,
+ }
+
+ action_prefixes = {
+ 'lib/ansible/plugins/action/': True,
+ 'plugins/action/': False,
+ }
+
+ for path in paths:
+ full_name = get_full_name(path, module_prefixes)
+
+ if full_name:
+ module_names.add(full_name)
+
+ for path in paths:
+ full_name = get_full_name(path, action_prefixes)
+
+ if full_name and full_name not in module_names:
+ print('%s: action plugin has no matching module to provide documentation' % path)
+
+
+def get_full_name(path, prefixes):
+ """Return the full name of the plugin at the given path by matching against the given path prefixes, or None if no match is found."""
+ for prefix, flat in prefixes.items():
+ if path.startswith(prefix):
+ relative_path = os.path.relpath(path, prefix)
+
+ if flat:
+ full_name = os.path.basename(relative_path)
+ else:
+ full_name = relative_path
+
+ full_name = os.path.splitext(full_name)[0]
+
+ name = os.path.basename(full_name)
+
+ if name == '__init__':
+ return None
+
+ if name.startswith('_'):
+ name = name[1:]
+
+ full_name = os.path.join(os.path.dirname(full_name), name).replace(os.path.sep, '.')
+
+ return full_name
+
+ return None
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.json
new file mode 100644
index 0000000..7d19f10
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.json
@@ -0,0 +1,8 @@
+{
+ "intercept": true,
+ "prefixes": [
+ "changelogs/config.yaml",
+ "changelogs/fragments/"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.py
new file mode 100644
index 0000000..924e5af
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog.py
@@ -0,0 +1,60 @@
+"""Check changelog fragment naming, syntax, etc."""
+from __future__ import annotations
+
+import os
+import sys
+import subprocess
+
+
+def main():
+ """Main entry point."""
+ paths = sys.argv[1:] or sys.stdin.read().splitlines()
+
+ allowed_extensions = ('.yml', '.yaml')
+ config_path = 'changelogs/config.yaml'
+
+ # config must be detected independent of the file list since the file list only contains files under test (changed)
+ has_config = os.path.exists(config_path)
+ paths_to_check = []
+ for path in paths:
+ if path == config_path:
+ continue
+
+ if path.startswith('changelogs/fragments/.'):
+ if path in ('changelogs/fragments/.keep', 'changelogs/fragments/.gitkeep'):
+ continue
+
+ print('%s:%d:%d: file must not be a dotfile' % (path, 0, 0))
+ continue
+
+ ext = os.path.splitext(path)[1]
+
+ if ext not in allowed_extensions:
+ print('%s:%d:%d: extension must be one of: %s' % (path, 0, 0, ', '.join(allowed_extensions)))
+
+ paths_to_check.append(path)
+
+ if not has_config:
+ print('changelogs/config.yaml:0:0: config file does not exist')
+ return
+
+ if not paths_to_check:
+ return
+
+ cmd = [sys.executable, '-m', 'antsibull_changelog', 'lint'] + paths_to_check
+
+ # The sphinx module is a soft dependency for rstcheck, which is used by the changelog linter.
+ # If sphinx is found it will be loaded by rstcheck, which can affect the results of the test.
+ # To maintain consistency across environments, loading of sphinx is blocked, since any version (or no version) of sphinx may be present.
+ env = os.environ.copy()
+ env.update(PYTHONPATH='%s:%s' % (os.path.join(os.path.dirname(__file__), 'changelog'), env['PYTHONPATH']))
+
+ # ignore the return code, rely on the output instead
+ process = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, env=env, check=False)
+
+ sys.stdout.write(process.stdout)
+ sys.stderr.write(process.stderr)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog/sphinx.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog/sphinx.py
new file mode 100644
index 0000000..7eab0f5
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/changelog/sphinx.py
@@ -0,0 +1,4 @@
+"""Block the sphinx module from being loaded."""
+from __future__ import annotations
+
+raise ImportError('The sphinx module has been prevented from loading to maintain consistent test results.')
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.json
new file mode 100644
index 0000000..9835f9b
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.json
@@ -0,0 +1,14 @@
+{
+ "prefixes": [
+ "lib/ansible/modules/",
+ "lib/ansible/module_utils/",
+ "plugins/modules/",
+ "plugins/module_utils/",
+ "test/units/",
+ "tests/unit/"
+ ],
+ "files": [
+ "__init__.py"
+ ],
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.py
new file mode 100644
index 0000000..01aef69
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/empty-init.py
@@ -0,0 +1,16 @@
+"""Require empty __init__.py files."""
+from __future__ import annotations
+
+import os
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ if os.path.getsize(path) > 0:
+ print('%s: empty __init__.py required' % path)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.json
new file mode 100644
index 0000000..4ebce32
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "py2_compat": true,
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.py
new file mode 100644
index 0000000..7b39c37
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/future-import-boilerplate.py
@@ -0,0 +1,46 @@
+"""Enforce proper usage of __future__ imports."""
+from __future__ import annotations
+
+import ast
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'rb') as path_fd:
+ lines = path_fd.read().splitlines()
+
+ missing = True
+ if not lines:
+ # Files are allowed to be empty of everything including boilerplate
+ missing = False
+
+ for text in lines:
+ if text in (b'from __future__ import (absolute_import, division, print_function)',
+ b'from __future__ import absolute_import, division, print_function'):
+ missing = False
+ break
+
+ if missing:
+ with open(path, encoding='utf-8') as file:
+ contents = file.read()
+
+ # noinspection PyBroadException
+ try:
+ node = ast.parse(contents)
+
+ # files consisting of only assignments have no need for future import boilerplate
+ # the only exception would be division during assignment, but we'll overlook that for simplicity
+ # the most likely case is that of a documentation only python file
+ if all(isinstance(statement, ast.Assign) for statement in node.body):
+ missing = False
+ except Exception: # pylint: disable=broad-except
+ pass # the compile sanity test will report this error
+
+ if missing:
+ print('%s: missing: from __future__ import (absolute_import, division, print_function)' % path)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.json
new file mode 100644
index 0000000..db5c3c9
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.json
@@ -0,0 +1,4 @@
+{
+ "text": true,
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.py
new file mode 100644
index 0000000..31f97ad
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/line-endings.py
@@ -0,0 +1,18 @@
+"""Require Unix line endings."""
+from __future__ import annotations
+
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'rb') as path_fd:
+ contents = path_fd.read()
+
+ if b'\r' in contents:
+ print('%s: use "\\n" for line endings instead of "\\r\\n"' % path)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.json
new file mode 100644
index 0000000..4ebce32
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "py2_compat": true,
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.py
new file mode 100644
index 0000000..8bdcfc9
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/metaclass-boilerplate.py
@@ -0,0 +1,44 @@
+"""Require __metaclass__ boilerplate for code that supports Python 2.x."""
+from __future__ import annotations
+
+import ast
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'rb') as path_fd:
+ lines = path_fd.read().splitlines()
+
+ missing = True
+ if not lines:
+ # Files are allowed to be empty of everything including boilerplate
+ missing = False
+
+ for text in lines:
+ if text == b'__metaclass__ = type':
+ missing = False
+ break
+
+ if missing:
+ with open(path, encoding='utf-8') as file:
+ contents = file.read()
+
+ # noinspection PyBroadException
+ try:
+ node = ast.parse(contents)
+
+ # files consisting of only assignments have no need for metaclass boilerplate
+ # the most likely case is that of a documentation only python file
+ if all(isinstance(statement, ast.Assign) for statement in node.body):
+ missing = False
+ except Exception: # pylint: disable=broad-except
+ pass # the compile sanity test will report this error
+
+ if missing:
+ print('%s: missing: __metaclass__ = type' % path)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.json
new file mode 100644
index 0000000..ccee80a
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.json
@@ -0,0 +1,10 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "prefixes": [
+ "lib/ansible/",
+ "plugins/"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.py
new file mode 100644
index 0000000..8c1c027
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-assert.py
@@ -0,0 +1,24 @@
+"""Disallow use of assert."""
+from __future__ import annotations
+
+import re
+import sys
+
+ASSERT_RE = re.compile(r'^\s*assert[^a-z0-9_:]')
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as file:
+ for i, line in enumerate(file.readlines()):
+ matches = ASSERT_RE.findall(line)
+
+ if matches:
+ lineno = i + 1
+ colno = line.index('assert') + 1
+ print('%s:%d:%d: raise AssertionError instead of: %s' % (path, lineno, colno, matches[0][colno - 1:]))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.py
new file mode 100644
index 0000000..74e38d7
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-basestring.py
@@ -0,0 +1,21 @@
+"""Disallow use of basestring isinstance checks."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'(isinstance.*basestring)', text)
+
+ if match:
+ print('%s:%d:%d: do not use `isinstance(s, basestring)`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.py
new file mode 100644
index 0000000..b4e4002
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iteritems.py
@@ -0,0 +1,21 @@
+"""Disallow use of the dict.iteritems function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'(?<! six)\.(iteritems)', text)
+
+ if match:
+ print('%s:%d:%d: use `dict.items` or `ansible.module_utils.six.iteritems` instead of `dict.iteritems`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.py
new file mode 100644
index 0000000..00c8703
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-iterkeys.py
@@ -0,0 +1,21 @@
+"""Disallow use of the dict.iterkeys function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'\.(iterkeys)', text)
+
+ if match:
+ print('%s:%d:%d: use `dict.keys` or `for key in dict:` instead of `dict.iterkeys`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.py
new file mode 100644
index 0000000..2e8036a
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-dict-itervalues.py
@@ -0,0 +1,21 @@
+"""Disallow use of the dict.itervalues function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'(?<! six)\.(itervalues)', text)
+
+ if match:
+ print('%s:%d:%d: use `dict.values` or `ansible.module_utils.six.itervalues` instead of `dict.itervalues`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.py
new file mode 100644
index 0000000..0abb23d
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-get-exception.py
@@ -0,0 +1,28 @@
+"""Disallow use of the get_exception function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ basic_allow_once = True
+
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'([^a-zA-Z0-9_]get_exception[^a-zA-Z0-9_])', text)
+
+ if match:
+ if path == 'lib/ansible/module_utils/basic.py' and basic_allow_once:
+ # basic.py is allowed to import get_exception for backwards compatibility but should not call it anywhere
+ basic_allow_once = False
+ continue
+
+ print('%s:%d:%d: do not use `get_exception`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.json
new file mode 100644
index 0000000..6f13c86
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.json
@@ -0,0 +1,5 @@
+{
+ "include_directories": true,
+ "include_symlinks": true,
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.py
new file mode 100644
index 0000000..10bf4aa
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-illegal-filenames.py
@@ -0,0 +1,83 @@
+"""
+Check for illegal filenames on various operating systems.
+The main rules are derived from restrictions on Windows:
+https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#naming-conventions
+"""
+from __future__ import annotations
+
+import os
+import struct
+import sys
+
+from ansible.module_utils.basic import to_bytes
+
+ILLEGAL_CHARS = [
+ b'<',
+ b'>',
+ b':',
+ b'"',
+ b'/',
+ b'\\',
+ b'|',
+ b'?',
+ b'*'
+] + [struct.pack("b", i) for i in range(32)]
+
+ILLEGAL_NAMES = [
+ "CON",
+ "PRN",
+ "AUX",
+ "NUL",
+ "COM1",
+ "COM2",
+ "COM3",
+ "COM4",
+ "COM5",
+ "COM6",
+ "COM7",
+ "COM8",
+ "COM9",
+ "LPT1",
+ "LPT2",
+ "LPT3",
+ "LPT4",
+ "LPT5",
+ "LPT6",
+ "LPT7",
+ "LPT8",
+ "LPT9",
+]
+
+ILLEGAL_END_CHARS = [
+ '.',
+ ' ',
+]
+
+
+def check_path(path, is_dir=False):
+ """Check the specified path for unwanted characters and names."""
+ type_name = 'directory' if is_dir else 'file'
+ file_name = os.path.basename(path.rstrip(os.path.sep))
+ name = os.path.splitext(file_name)[0]
+
+ if name.upper() in ILLEGAL_NAMES:
+ print("%s: illegal %s name %s" % (path, type_name, name.upper()))
+
+ if file_name[-1] in ILLEGAL_END_CHARS:
+ print("%s: illegal %s name end-char '%s'" % (path, type_name, file_name[-1]))
+
+ bfile = to_bytes(file_name, encoding='utf-8')
+ for char in ILLEGAL_CHARS:
+ if char in bfile:
+ bpath = to_bytes(path, encoding='utf-8')
+ print("%s: illegal char '%s' in %s name" % (bpath, char, type_name))
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ check_path(path, is_dir=path.endswith(os.path.sep))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.json
new file mode 100644
index 0000000..ccee80a
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.json
@@ -0,0 +1,10 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "prefixes": [
+ "lib/ansible/",
+ "plugins/"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.py
new file mode 100644
index 0000000..eb5987d
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-main-display.py
@@ -0,0 +1,21 @@
+"""Disallow importing display from __main__."""
+from __future__ import annotations
+
+import sys
+
+MAIN_DISPLAY_IMPORT = 'from __main__ import display'
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as file:
+ for i, line in enumerate(file.readlines()):
+ if MAIN_DISPLAY_IMPORT in line:
+ lineno = i + 1
+ colno = line.index(MAIN_DISPLAY_IMPORT) + 1
+ print('%s:%d:%d: Display is a singleton, just import and instantiate' % (path, lineno, colno))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.json
new file mode 100644
index 0000000..54d9fff
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.json
@@ -0,0 +1,5 @@
+{
+ "text": true,
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.py
new file mode 100644
index 0000000..461033d
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-smart-quotes.py
@@ -0,0 +1,28 @@
+"""Disallow use of Unicode quotes."""
+# -*- coding: utf-8 -*-
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'rb') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ try:
+ text = text.decode('utf-8')
+ except UnicodeDecodeError as ex:
+ print('%s:%d:%d: UnicodeDecodeError: %s' % (path, line + 1, ex.start + 1, ex))
+ continue
+
+ match = re.search('([‘’“”])', text)
+
+ if match:
+ print('%s:%d:%d: use ASCII quotes `\'` and `"` instead of Unicode quotes' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.py
new file mode 100644
index 0000000..75c34f2
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/no-unicode-literals.py
@@ -0,0 +1,21 @@
+"""Disallow use of the unicode_literals future."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'(unicode_literals)', text)
+
+ if match:
+ print('%s:%d:%d: do not use `unicode_literals`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.json
new file mode 100644
index 0000000..88858ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.json
@@ -0,0 +1,7 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "ignore_self": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.py
new file mode 100644
index 0000000..a6dd5aa
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/replace-urlopen.py
@@ -0,0 +1,21 @@
+"""Disallow use of the urlopen function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'^(?:[^#]*?)(urlopen)', text)
+
+ if match:
+ print('%s:%d:%d: use `ansible.module_utils.urls.open_url` instead of `urlopen`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.json
new file mode 100644
index 0000000..44003ec
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.json
@@ -0,0 +1,11 @@
+{
+ "prefixes": [
+ "lib/ansible/config/ansible_builtin_runtime.yml",
+ "meta/routing.yml",
+ "meta/runtime.yml"
+ ],
+ "extensions": [
+ ".yml"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
new file mode 100644
index 0000000..6cf2777
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/runtime-metadata.py
@@ -0,0 +1,277 @@
+"""Schema validation of ansible-core's ansible_builtin_runtime.yml and collection's meta/runtime.yml"""
+from __future__ import annotations
+
+import datetime
+import os
+import re
+import sys
+
+from functools import partial
+
+import yaml
+
+from voluptuous import All, Any, MultipleInvalid, PREVENT_EXTRA
+from voluptuous import Required, Schema, Invalid
+from voluptuous.humanize import humanize_error
+
+from ansible.module_utils.compat.version import StrictVersion, LooseVersion
+from ansible.module_utils.six import string_types
+from ansible.utils.version import SemanticVersion
+
+
+def isodate(value, check_deprecation_date=False, is_tombstone=False):
+ """Validate a datetime.date or ISO 8601 date string."""
+ # datetime.date objects come from YAML dates, these are ok
+ if isinstance(value, datetime.date):
+ removal_date = value
+ else:
+ # make sure we have a string
+ msg = 'Expected ISO 8601 date string (YYYY-MM-DD), or YAML date'
+ if not isinstance(value, string_types):
+ raise Invalid(msg)
+ # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions,
+ # we have to do things manually.
+ if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', value):
+ raise Invalid(msg)
+ try:
+ removal_date = datetime.datetime.strptime(value, '%Y-%m-%d').date()
+ except ValueError:
+ raise Invalid(msg)
+ # Make sure date is correct
+ today = datetime.date.today()
+ if is_tombstone:
+ # For a tombstone, the removal date must be in the past
+ if today < removal_date:
+ raise Invalid(
+ 'The tombstone removal_date (%s) must not be after today (%s)' % (removal_date, today))
+ else:
+ # For a deprecation, the removal date must be in the future. Only test this if
+ # check_deprecation_date is truish, to avoid checks to suddenly start to fail.
+ if check_deprecation_date and today > removal_date:
+ raise Invalid(
+ 'The deprecation removal_date (%s) must be after today (%s)' % (removal_date, today))
+ return value
+
+
+def removal_version(value, is_ansible, current_version=None, is_tombstone=False):
+ """Validate a removal version string."""
+ msg = (
+ 'Removal version must be a string' if is_ansible else
+ 'Removal version must be a semantic version (https://semver.org/)'
+ )
+ if not isinstance(value, string_types):
+ raise Invalid(msg)
+ try:
+ if is_ansible:
+ version = StrictVersion()
+ version.parse(value)
+ version = LooseVersion(value) # We're storing Ansible's version as a LooseVersion
+ else:
+ version = SemanticVersion()
+ version.parse(value)
+ if version.major != 0 and (version.minor != 0 or version.patch != 0):
+ raise Invalid('removal_version (%r) must be a major release, not a minor or patch release '
+ '(see specification at https://semver.org/)' % (value, ))
+ if current_version is not None:
+ if is_tombstone:
+ # For a tombstone, the removal version must not be in the future
+ if version > current_version:
+ raise Invalid('The tombstone removal_version (%r) must not be after the '
+ 'current version (%s)' % (value, current_version))
+ else:
+ # For a deprecation, the removal version must be in the future
+ if version <= current_version:
+ raise Invalid('The deprecation removal_version (%r) must be after the '
+ 'current version (%s)' % (value, current_version))
+ except ValueError:
+ raise Invalid(msg)
+ return value
+
+
+def any_value(value):
+ """Accepts anything."""
+ return value
+
+
+def get_ansible_version():
+ """Return current ansible-core version"""
+ from ansible.release import __version__
+
+ return LooseVersion('.'.join(__version__.split('.')[:3]))
+
+
+def get_collection_version():
+ """Return current collection version, or None if it is not available"""
+ import importlib.util
+
+ collection_detail_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'tools', 'collection_detail.py')
+ collection_detail_spec = importlib.util.spec_from_file_location('collection_detail', collection_detail_path)
+ collection_detail = importlib.util.module_from_spec(collection_detail_spec)
+ sys.modules['collection_detail'] = collection_detail
+ collection_detail_spec.loader.exec_module(collection_detail)
+
+ # noinspection PyBroadException
+ try:
+ result = collection_detail.read_manifest_json('.') or collection_detail.read_galaxy_yml('.')
+ return SemanticVersion(result['version'])
+ except Exception: # pylint: disable=broad-except
+ # We do not care why it fails, in case we cannot get the version
+ # just return None to indicate "we don't know".
+ return None
+
+
+def validate_metadata_file(path, is_ansible, check_deprecation_dates=False):
+ """Validate explicit runtime metadata file"""
+ try:
+ with open(path, 'r', encoding='utf-8') as f_path:
+ routing = yaml.safe_load(f_path)
+ except yaml.error.MarkedYAMLError as ex:
+ print('%s:%d:%d: YAML load failed: %s' % (path, ex.context_mark.line +
+ 1, ex.context_mark.column + 1, re.sub(r'\s+', ' ', str(ex))))
+ return
+ except Exception as ex: # pylint: disable=broad-except
+ print('%s:%d:%d: YAML load failed: %s' %
+ (path, 0, 0, re.sub(r'\s+', ' ', str(ex))))
+ return
+
+ if is_ansible:
+ current_version = get_ansible_version()
+ else:
+ current_version = get_collection_version()
+
+ # Updates to schema MUST also be reflected in the documentation
+ # ~https://docs.ansible.com/ansible-core/devel/dev_guide/developing_collections.html
+
+ # plugin_routing schema
+
+ avoid_additional_data = Schema(
+ Any(
+ {
+ Required('removal_version'): any_value,
+ 'warning_text': any_value,
+ },
+ {
+ Required('removal_date'): any_value,
+ 'warning_text': any_value,
+ }
+ ),
+ extra=PREVENT_EXTRA
+ )
+
+ deprecation_schema = All(
+ # The first schema validates the input, and the second makes sure no extra keys are specified
+ Schema(
+ {
+ 'removal_version': partial(removal_version, is_ansible=is_ansible,
+ current_version=current_version),
+ 'removal_date': partial(isodate, check_deprecation_date=check_deprecation_dates),
+ 'warning_text': Any(*string_types),
+ }
+ ),
+ avoid_additional_data
+ )
+
+ tombstoning_schema = All(
+ # The first schema validates the input, and the second makes sure no extra keys are specified
+ Schema(
+ {
+ 'removal_version': partial(removal_version, is_ansible=is_ansible,
+ current_version=current_version, is_tombstone=True),
+ 'removal_date': partial(isodate, is_tombstone=True),
+ 'warning_text': Any(*string_types),
+ }
+ ),
+ avoid_additional_data
+ )
+
+ plugin_routing_schema = Any(
+ Schema({
+ ('deprecation'): Any(deprecation_schema),
+ ('tombstone'): Any(tombstoning_schema),
+ ('redirect'): Any(*string_types),
+ }, extra=PREVENT_EXTRA),
+ )
+
+ list_dict_plugin_routing_schema = [{str_type: plugin_routing_schema}
+ for str_type in string_types]
+
+ plugin_schema = Schema({
+ ('action'): Any(None, *list_dict_plugin_routing_schema),
+ ('become'): Any(None, *list_dict_plugin_routing_schema),
+ ('cache'): Any(None, *list_dict_plugin_routing_schema),
+ ('callback'): Any(None, *list_dict_plugin_routing_schema),
+ ('cliconf'): Any(None, *list_dict_plugin_routing_schema),
+ ('connection'): Any(None, *list_dict_plugin_routing_schema),
+ ('doc_fragments'): Any(None, *list_dict_plugin_routing_schema),
+ ('filter'): Any(None, *list_dict_plugin_routing_schema),
+ ('httpapi'): Any(None, *list_dict_plugin_routing_schema),
+ ('inventory'): Any(None, *list_dict_plugin_routing_schema),
+ ('lookup'): Any(None, *list_dict_plugin_routing_schema),
+ ('module_utils'): Any(None, *list_dict_plugin_routing_schema),
+ ('modules'): Any(None, *list_dict_plugin_routing_schema),
+ ('netconf'): Any(None, *list_dict_plugin_routing_schema),
+ ('shell'): Any(None, *list_dict_plugin_routing_schema),
+ ('strategy'): Any(None, *list_dict_plugin_routing_schema),
+ ('terminal'): Any(None, *list_dict_plugin_routing_schema),
+ ('test'): Any(None, *list_dict_plugin_routing_schema),
+ ('vars'): Any(None, *list_dict_plugin_routing_schema),
+ }, extra=PREVENT_EXTRA)
+
+ # import_redirection schema
+
+ import_redirection_schema = Any(
+ Schema({
+ ('redirect'): Any(*string_types),
+ # import_redirect doesn't currently support deprecation
+ }, extra=PREVENT_EXTRA)
+ )
+
+ list_dict_import_redirection_schema = [{str_type: import_redirection_schema}
+ for str_type in string_types]
+
+ # top level schema
+
+ schema = Schema({
+ # All of these are optional
+ ('plugin_routing'): Any(plugin_schema),
+ ('import_redirection'): Any(None, *list_dict_import_redirection_schema),
+ # requires_ansible: In the future we should validate this with SpecifierSet
+ ('requires_ansible'): Any(*string_types),
+ ('action_groups'): dict,
+ }, extra=PREVENT_EXTRA)
+
+ # Ensure schema is valid
+
+ try:
+ schema(routing)
+ except MultipleInvalid as ex:
+ for error in ex.errors:
+ # No way to get line/column numbers
+ print('%s:%d:%d: %s' % (path, 0, 0, humanize_error(routing, error)))
+
+
+def main():
+ """Main entry point."""
+ paths = sys.argv[1:] or sys.stdin.read().splitlines()
+
+ collection_legacy_file = 'meta/routing.yml'
+ collection_runtime_file = 'meta/runtime.yml'
+
+ # This is currently disabled, because if it is enabled this test can start failing
+ # at a random date. For this to be properly activated, we (a) need to be able to return
+ # codes for this test, and (b) make this error optional.
+ check_deprecation_dates = False
+
+ for path in paths:
+ if path == collection_legacy_file:
+ print('%s:%d:%d: %s' % (path, 0, 0, ("Should be called '%s'" % collection_runtime_file)))
+ continue
+
+ validate_metadata_file(
+ path,
+ is_ansible=path not in (collection_legacy_file, collection_runtime_file),
+ check_deprecation_dates=check_deprecation_dates)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.json
new file mode 100644
index 0000000..5648429
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.json
@@ -0,0 +1,4 @@
+{
+ "text": true,
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.py
new file mode 100644
index 0000000..b0b1319
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/shebang.py
@@ -0,0 +1,124 @@
+"""Check shebangs, execute bits and byte order marks."""
+from __future__ import annotations
+
+import os
+import re
+import stat
+import sys
+
+
+def main():
+ """Main entry point."""
+ standard_shebangs = set([
+ b'#!/bin/bash -eu',
+ b'#!/bin/bash -eux',
+ b'#!/bin/sh',
+ b'#!/usr/bin/env bash',
+ b'#!/usr/bin/env fish',
+ b'#!/usr/bin/env pwsh',
+ b'#!/usr/bin/env python',
+ b'#!/usr/bin/make -f',
+ ])
+
+ integration_shebangs = set([
+ b'#!/bin/sh',
+ b'#!/usr/bin/env bash',
+ b'#!/usr/bin/env python',
+ ])
+
+ module_shebangs = {
+ '': b'#!/usr/bin/python',
+ '.py': b'#!/usr/bin/python',
+ '.ps1': b'#!powershell',
+ }
+
+ # see https://unicode.org/faq/utf_bom.html#bom1
+ byte_order_marks = (
+ (b'\x00\x00\xFE\xFF', 'UTF-32 (BE)'),
+ (b'\xFF\xFE\x00\x00', 'UTF-32 (LE)'),
+ (b'\xFE\xFF', 'UTF-16 (BE)'),
+ (b'\xFF\xFE', 'UTF-16 (LE)'),
+ (b'\xEF\xBB\xBF', 'UTF-8'),
+ )
+
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'rb') as path_fd:
+ shebang = path_fd.readline().strip()
+ mode = os.stat(path).st_mode
+ executable = (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) & mode
+
+ if not shebang or not shebang.startswith(b'#!'):
+ if executable:
+ print('%s:%d:%d: file without shebang should not be executable' % (path, 0, 0))
+
+ for mark, name in byte_order_marks:
+ if shebang.startswith(mark):
+ print('%s:%d:%d: file starts with a %s byte order mark' % (path, 0, 0, name))
+ break
+
+ continue
+
+ is_module = False
+ is_integration = False
+
+ dirname = os.path.dirname(path)
+
+ if path.startswith('lib/ansible/modules/'):
+ is_module = True
+ elif re.search('^test/support/[^/]+/plugins/modules/', path):
+ is_module = True
+ elif re.search('^test/support/[^/]+/collections/ansible_collections/[^/]+/[^/]+/plugins/modules/', path):
+ is_module = True
+ elif path == 'test/lib/ansible_test/_util/target/cli/ansible_test_cli_stub.py':
+ pass # ansible-test entry point must be executable and have a shebang
+ elif re.search(r'^lib/ansible/cli/[^/]+\.py', path):
+ pass # cli entry points must be executable and have a shebang
+ elif path.startswith('examples/'):
+ continue # examples trigger some false positives due to location
+ elif path.startswith('lib/') or path.startswith('test/lib/'):
+ if executable:
+ print('%s:%d:%d: should not be executable' % (path, 0, 0))
+
+ if shebang:
+ print('%s:%d:%d: should not have a shebang' % (path, 0, 0))
+
+ continue
+ elif path.startswith('test/integration/targets/') or path.startswith('tests/integration/targets/'):
+ is_integration = True
+
+ if dirname.endswith('/library') or '/plugins/modules' in dirname or dirname in (
+ # non-standard module library directories
+ 'test/integration/targets/module_precedence/lib_no_extension',
+ 'test/integration/targets/module_precedence/lib_with_extension',
+ ):
+ is_module = True
+ elif path.startswith('plugins/modules/'):
+ is_module = True
+
+ if is_module:
+ if executable:
+ print('%s:%d:%d: module should not be executable' % (path, 0, 0))
+
+ ext = os.path.splitext(path)[1]
+ expected_shebang = module_shebangs.get(ext)
+ expected_ext = ' or '.join(['"%s"' % k for k in module_shebangs])
+
+ if expected_shebang:
+ if shebang == expected_shebang:
+ continue
+
+ print('%s:%d:%d: expected module shebang "%s" but found: %s' % (path, 1, 1, expected_shebang, shebang))
+ else:
+ print('%s:%d:%d: expected module extension %s but found: %s' % (path, 0, 0, expected_ext, ext))
+ else:
+ if is_integration:
+ allowed = integration_shebangs
+ else:
+ allowed = standard_shebangs
+
+ if shebang not in allowed:
+ print('%s:%d:%d: unexpected non-module shebang: %s' % (path, 1, 1, shebang))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.json
new file mode 100644
index 0000000..6f13c86
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.json
@@ -0,0 +1,5 @@
+{
+ "include_directories": true,
+ "include_symlinks": true,
+ "output": "path-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.py
new file mode 100644
index 0000000..5cffc69
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/symlinks.py
@@ -0,0 +1,32 @@
+"""Check for unwanted symbolic links."""
+from __future__ import annotations
+
+import os
+import sys
+
+
+def main():
+ """Main entry point."""
+ root_dir = os.getcwd() + os.path.sep
+
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ if not os.path.islink(path.rstrip(os.path.sep)):
+ continue
+
+ if not os.path.exists(path):
+ print('%s: broken symlinks are not allowed' % path)
+ continue
+
+ if path.endswith(os.path.sep):
+ print('%s: symlinks to directories are not allowed' % path)
+ continue
+
+ real_path = os.path.realpath(path)
+
+ if not real_path.startswith(root_dir):
+ print('%s: symlinks outside content tree are not allowed: %s' % (path, os.path.relpath(real_path, os.path.dirname(path))))
+ continue
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.json
new file mode 100644
index 0000000..3610305
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.json
@@ -0,0 +1,10 @@
+{
+ "prefixes": [
+ "lib/ansible/modules/",
+ "plugins/modules/"
+ ],
+ "extensions": [
+ ".py"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.py
new file mode 100644
index 0000000..0faeff3
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-argspec-type-path.py
@@ -0,0 +1,21 @@
+"""Disallow use of the expanduser function."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'(expanduser)', text)
+
+ if match:
+ print('%s:%d:%d: use argspec type="path" instead of type="str" to avoid use of `expanduser`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.json b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.json
new file mode 100644
index 0000000..776590b
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.json
@@ -0,0 +1,6 @@
+{
+ "extensions": [
+ ".py"
+ ],
+ "output": "path-line-column-message"
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.py b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.py
new file mode 100644
index 0000000..db42fec
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/code-smell/use-compat-six.py
@@ -0,0 +1,21 @@
+"""Disallow importing of the six module."""
+from __future__ import annotations
+
+import re
+import sys
+
+
+def main():
+ """Main entry point."""
+ for path in sys.argv[1:] or sys.stdin.read().splitlines():
+ with open(path, 'r', encoding='utf-8') as path_fd:
+ for line, text in enumerate(path_fd.readlines()):
+ match = re.search(r'((^\s*import\s+six\b)|(^\s*from\s+six\b))', text)
+
+ if match:
+ print('%s:%d:%d: use `ansible.module_utils.six` instead of `six`' % (
+ path, line + 1, match.start(1) + 1))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/integration-aliases/yaml_to_json.py b/test/lib/ansible_test/_util/controller/sanity/integration-aliases/yaml_to_json.py
new file mode 100644
index 0000000..af11dd8
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/integration-aliases/yaml_to_json.py
@@ -0,0 +1,14 @@
+"""Read YAML from stdin and write JSON to stdout."""
+from __future__ import annotations
+
+import json
+import sys
+
+from yaml import load
+
+try:
+ from yaml import CSafeLoader as SafeLoader
+except ImportError:
+ from yaml import SafeLoader
+
+json.dump(load(sys.stdin, Loader=SafeLoader), sys.stdout)
diff --git a/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-core.ini b/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-core.ini
new file mode 100644
index 0000000..4d93f35
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-core.ini
@@ -0,0 +1,119 @@
+# IMPORTANT
+# Set "ignore_missing_imports" per package below, rather than globally.
+# That will help identify missing type stubs that should be added to the sanity test environment.
+
+[mypy]
+# There are ~20 errors reported in ansible-core when strict optional checking is enabled.
+# Until the number of occurrences are reduced, it's better to disable strict checking.
+strict_optional = False
+# There are ~70 errors reported in ansible-core when checking attributes.
+# Until the number of occurrences are reduced, it's better to disable the check.
+disable_error_code = attr-defined
+
+[mypy-ansible.module_utils.six.moves.*]
+ignore_missing_imports = True
+
+[mypy-passlib.*]
+ignore_missing_imports = True
+
+[mypy-pexpect.*]
+ignore_missing_imports = True
+
+[mypy-pypsrp.*]
+ignore_missing_imports = True
+
+[mypy-winrm.*]
+ignore_missing_imports = True
+
+[mypy-kerberos.*]
+ignore_missing_imports = True
+
+[mypy-xmltodict.*]
+ignore_missing_imports = True
+
+[mypy-md5.*]
+ignore_missing_imports = True
+
+[mypy-scp.*]
+ignore_missing_imports = True
+
+[mypy-ncclient.*]
+ignore_missing_imports = True
+
+[mypy-lxml.*]
+ignore_missing_imports = True
+
+[mypy-yum.*]
+ignore_missing_imports = True
+
+[mypy-rpmUtils.*]
+ignore_missing_imports = True
+
+[mypy-rpm.*]
+ignore_missing_imports = True
+
+[mypy-psutil.*]
+ignore_missing_imports = True
+
+[mypy-dnf.*]
+ignore_missing_imports = True
+
+[mypy-apt.*]
+ignore_missing_imports = True
+
+[mypy-apt_pkg.*]
+ignore_missing_imports = True
+
+[mypy-gssapi.*]
+ignore_missing_imports = True
+
+[mypy-_ssl.*]
+ignore_missing_imports = True
+
+[mypy-urllib_gssapi.*]
+ignore_missing_imports = True
+
+[mypy-systemd.*]
+ignore_missing_imports = True
+
+[mypy-sha.*]
+ignore_missing_imports = True
+
+[mypy-distro.*]
+ignore_missing_imports = True
+
+[mypy-selectors2.*]
+ignore_missing_imports = True
+
+[mypy-resolvelib.*]
+ignore_missing_imports = True
+
+[mypy-urlparse.*]
+ignore_missing_imports = True
+
+[mypy-argcomplete.*]
+ignore_missing_imports = True
+
+[mypy-selinux.*]
+ignore_missing_imports = True
+
+[mypy-urllib2.*]
+ignore_missing_imports = True
+
+[mypy-httplib.*]
+ignore_missing_imports = True
+
+[mypy-compiler.*]
+ignore_missing_imports = True
+
+[mypy-aptsources.*]
+ignore_missing_imports = True
+
+[mypy-urllib3.*]
+ignore_missing_imports = True
+
+[mypy-requests.*]
+ignore_missing_imports = True
+
+[mypy-jinja2.nativetypes]
+ignore_missing_imports = True
diff --git a/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-test.ini b/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-test.ini
new file mode 100644
index 0000000..190e952
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/mypy/ansible-test.ini
@@ -0,0 +1,24 @@
+# IMPORTANT
+# Set "ignore_missing_imports" per package below, rather than globally.
+# That will help identify missing type stubs that should be added to the sanity test environment.
+
+[mypy]
+# There are ~350 errors reported in ansible-test when strict optional checking is enabled.
+# Until the number of occurrences are greatly reduced, it's better to disable strict checking.
+strict_optional = False
+# There are ~25 errors reported in ansible-test under the 'misc' code.
+# The majority of those errors are "Only concrete class can be given", which is due to a limitation of mypy.
+# See: https://github.com/python/mypy/issues/5374
+disable_error_code = misc
+
+[mypy-argcomplete]
+ignore_missing_imports = True
+
+[mypy-coverage]
+ignore_missing_imports = True
+
+[mypy-ansible_release]
+ignore_missing_imports = True
+
+[mypy-StringIO]
+ignore_missing_imports = True
diff --git a/test/lib/ansible_test/_util/controller/sanity/mypy/modules.ini b/test/lib/ansible_test/_util/controller/sanity/mypy/modules.ini
new file mode 100644
index 0000000..d6a608f
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/mypy/modules.ini
@@ -0,0 +1,98 @@
+# IMPORTANT
+# Set "ignore_missing_imports" per package below, rather than globally.
+# That will help identify missing type stubs that should be added to the sanity test environment.
+
+[mypy]
+
+[mypy-ansible.module_utils.six.moves.*]
+ignore_missing_imports = True
+
+[mypy-pexpect.*]
+ignore_missing_imports = True
+
+[mypy-md5.*]
+ignore_missing_imports = True
+
+[mypy-yum.*]
+ignore_missing_imports = True
+
+[mypy-rpmUtils.*]
+ignore_missing_imports = True
+
+[mypy-rpm.*]
+ignore_missing_imports = True
+
+[mypy-psutil.*]
+ignore_missing_imports = True
+
+[mypy-dnf.*]
+ignore_missing_imports = True
+
+[mypy-apt.*]
+ignore_missing_imports = True
+
+[mypy-apt_pkg.*]
+ignore_missing_imports = True
+
+[mypy-gssapi.*]
+ignore_missing_imports = True
+
+[mypy-_ssl.*]
+ignore_missing_imports = True
+
+[mypy-urllib_gssapi.*]
+ignore_missing_imports = True
+
+[mypy-systemd.*]
+ignore_missing_imports = True
+
+[mypy-sha.*]
+ignore_missing_imports = True
+
+[mypy-distro.*]
+ignore_missing_imports = True
+
+[mypy-selectors2.*]
+ignore_missing_imports = True
+
+[mypy-selinux.*]
+ignore_missing_imports = True
+
+[mypy-urllib2.*]
+ignore_missing_imports = True
+
+[mypy-httplib.*]
+ignore_missing_imports = True
+
+[mypy-compiler.*]
+ignore_missing_imports = True
+
+[mypy-aptsources.*]
+ignore_missing_imports = True
+
+[mypy-urllib3.*]
+ignore_missing_imports = True
+
+[mypy-requests.*]
+ignore_missing_imports = True
+
+[mypy-pkg_resources.*]
+ignore_missing_imports = True
+
+[mypy-urllib.*]
+ignore_missing_imports = True
+
+[mypy-email.*]
+ignore_missing_imports = True
+
+[mypy-selectors.*]
+ignore_missing_imports = True
+
+[mypy-importlib.*]
+ignore_missing_imports = True
+
+[mypy-collections.*]
+ignore_missing_imports = True
+
+[mypy-http.*]
+ignore_missing_imports = True
diff --git a/test/lib/ansible_test/_util/controller/sanity/pep8/current-ignore.txt b/test/lib/ansible_test/_util/controller/sanity/pep8/current-ignore.txt
new file mode 100644
index 0000000..659c7f5
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pep8/current-ignore.txt
@@ -0,0 +1,4 @@
+E402
+W503
+W504
+E741
diff --git a/test/lib/ansible_test/_util/controller/sanity/pslint/pslint.ps1 b/test/lib/ansible_test/_util/controller/sanity/pslint/pslint.ps1
new file mode 100644
index 0000000..0cf3c7f
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pslint/pslint.ps1
@@ -0,0 +1,37 @@
+#Requires -Version 6
+#Requires -Modules PSScriptAnalyzer, PSSA-PSCustomUseLiteralPath
+
+$ErrorActionPreference = "Stop"
+$WarningPreference = "Stop"
+
+$LiteralPathRule = Import-Module -Name PSSA-PSCustomUseLiteralPath -PassThru
+$LiteralPathRulePath = Join-Path -Path $LiteralPathRule.ModuleBase -ChildPath $LiteralPathRule.RootModule
+
+$PSSAParams = @{
+ CustomRulePath = @($LiteralPathRulePath)
+ IncludeDefaultRules = $true
+ Setting = (Join-Path -Path $PSScriptRoot -ChildPath "settings.psd1")
+}
+
+$Results = @(
+ ForEach ($Path in $Args) {
+ $Retries = 3
+
+ Do {
+ Try {
+ Invoke-ScriptAnalyzer -Path $Path @PSSAParams 3> $null
+ $Retries = 0
+ }
+ Catch {
+ If (--$Retries -le 0) {
+ Throw
+ }
+ }
+ }
+ Until ($Retries -le 0)
+ }
+)
+
+# Since pwsh 7.1 results that exceed depth will produce a warning which fails the process.
+# Ignore warnings only for this step.
+ConvertTo-Json -InputObject $Results -Depth 1 -WarningAction SilentlyContinue
diff --git a/test/lib/ansible_test/_util/controller/sanity/pslint/settings.psd1 b/test/lib/ansible_test/_util/controller/sanity/pslint/settings.psd1
new file mode 100644
index 0000000..2ae13b4
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pslint/settings.psd1
@@ -0,0 +1,52 @@
+@{
+ Rules = @{
+ PSAvoidLongLines = @{
+ Enable = $true
+ MaximumLineLength = 160
+ }
+ PSPlaceOpenBrace = @{
+ Enable = $true
+ OnSameLine = $true
+ IgnoreOneLineBlock = $true
+ NewLineAfter = $true
+ }
+ PSPlaceCloseBrace = @{
+ Enable = $true
+ IgnoreOneLineBlock = $true
+ NewLineAfter = $true
+ NoEmptyLineBefore = $false
+ }
+ PSUseConsistentIndentation = @{
+ Enable = $true
+ IndentationSize = 4
+ PipelineIndentation = 'IncreaseIndentationForFirstPipeline'
+ Kind = 'space'
+ }
+ PSUseConsistentWhitespace = @{
+ Enable = $true
+ CheckInnerBrace = $true
+ CheckOpenBrace = $true
+ CheckOpenParen = $true
+ CheckOperator = $true
+ CheckPipe = $true
+ CheckPipeForRedundantWhitespace = $false
+ CheckSeparator = $true
+ CheckParameter = $false
+ IgnoreAssignmentOperatorInsideHashTable = $false
+ }
+ }
+ ExcludeRules = @(
+ 'PSUseOutputTypeCorrectly',
+ 'PSUseShouldProcessForStateChangingFunctions',
+ # We send strings as plaintext so will always come across the 3 issues
+ 'PSAvoidUsingPlainTextForPassword',
+ 'PSAvoidUsingConvertToSecureStringWithPlainText',
+ 'PSAvoidUsingUserNameAndPassWordParams',
+ # We send the module as a base64 encoded string and a BOM will cause
+ # issues here
+ 'PSUseBOMForUnicodeEncodedFile',
+ # Too many false positives, there are many cases where shared utils
+ # invoke user defined code but not all parameters are used.
+ 'PSReviewUnusedParameter'
+ )
+}
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test-target.cfg b/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test-target.cfg
new file mode 100644
index 0000000..aa34772
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test-target.cfg
@@ -0,0 +1,57 @@
+[MESSAGES CONTROL]
+
+disable=
+ consider-using-f-string, # Python 2.x support still required
+ cyclic-import, # consistent results require running with --jobs 1 and testing all files
+ deprecated-method, # results vary by Python version
+ deprecated-module, # results vary by Python version
+ duplicate-code, # consistent results require running with --jobs 1 and testing all files
+ import-outside-toplevel, # common pattern in ansible related code
+ raise-missing-from, # Python 2.x does not support raise from
+ super-with-arguments, # Python 2.x does not support super without arguments
+ redundant-u-string-prefix, # Python 2.x support still required
+ too-few-public-methods,
+ too-many-arguments,
+ too-many-branches,
+ too-many-instance-attributes,
+ too-many-lines,
+ too-many-locals,
+ too-many-nested-blocks,
+ too-many-return-statements,
+ too-many-statements,
+ useless-return, # complains about returning None when the return type is optional
+
+[BASIC]
+
+bad-names=
+ _,
+ bar,
+ baz,
+ foo,
+ tata,
+ toto,
+ tutu,
+
+good-names=
+ __metaclass__,
+ C,
+ ex,
+ i,
+ j,
+ k,
+ Run,
+
+class-attribute-rgx=[A-Za-z_][A-Za-z0-9_]{1,40}$
+attr-rgx=[a-z_][a-z0-9_]{1,40}$
+method-rgx=[a-z_][a-z0-9_]{1,40}$
+function-rgx=[a-z_][a-z0-9_]{1,40}$
+
+[IMPORTS]
+
+preferred-modules =
+ distutils.version:ansible.module_utils.compat.version,
+
+# These modules are used by ansible-test, but will not be present in the virtual environment running pylint.
+# Listing them here makes it possible to enable the import-error check.
+ignored-modules =
+ py,
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test.cfg b/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test.cfg
new file mode 100644
index 0000000..1c03472
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/config/ansible-test.cfg
@@ -0,0 +1,63 @@
+[MESSAGES CONTROL]
+
+disable=
+ consider-using-f-string, # many occurrences
+ cyclic-import, # consistent results require running with --jobs 1 and testing all files
+ deprecated-method, # results vary by Python version
+ deprecated-module, # results vary by Python version
+ duplicate-code, # consistent results require running with --jobs 1 and testing all files
+ import-outside-toplevel, # common pattern in ansible related code
+ raise-missing-from, # Python 2.x does not support raise from
+ too-few-public-methods,
+ too-many-public-methods,
+ too-many-arguments,
+ too-many-branches,
+ too-many-instance-attributes,
+ too-many-lines,
+ too-many-locals,
+ too-many-nested-blocks,
+ too-many-return-statements,
+ too-many-statements,
+ unspecified-encoding, # always run with UTF-8 encoding enforced
+ useless-return, # complains about returning None when the return type is optional
+
+[BASIC]
+
+bad-names=
+ _,
+ bar,
+ baz,
+ foo,
+ tata,
+ toto,
+ tutu,
+
+good-names=
+ __metaclass__,
+ C,
+ ex,
+ i,
+ j,
+ k,
+ Run,
+
+class-attribute-rgx=[A-Za-z_][A-Za-z0-9_]{1,40}$
+attr-rgx=[a-z_][a-z0-9_]{1,40}$
+method-rgx=[a-z_][a-z0-9_]{1,40}$
+function-rgx=[a-z_][a-z0-9_]{1,40}$
+
+# Use the regex from earlier versions of pylint.
+# See: https://github.com/PyCQA/pylint/pull/7322
+typevar-rgx=^_{0,2}(?:[^\W\da-z_]+|(?:[^\W\da-z_]+[^\WA-Z_]+)+T?(?<!Type))(?:_co(?:ntra)?)?$
+
+[IMPORTS]
+
+preferred-modules =
+ distutils.version:ansible.module_utils.compat.version,
+
+# These modules are used by ansible-test, but will not be present in the virtual environment running pylint.
+# Listing them here makes it possible to enable the import-error check.
+ignored-modules =
+ cryptography,
+ coverage,
+ yamllint,
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/config/code-smell.cfg b/test/lib/ansible_test/_util/controller/sanity/pylint/config/code-smell.cfg
new file mode 100644
index 0000000..e3aa8ee
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/config/code-smell.cfg
@@ -0,0 +1,57 @@
+[MESSAGES CONTROL]
+
+disable=
+ consider-using-f-string, # many occurrences
+ cyclic-import, # consistent results require running with --jobs 1 and testing all files
+ deprecated-method, # results vary by Python version
+ deprecated-module, # results vary by Python version
+ duplicate-code, # consistent results require running with --jobs 1 and testing all files
+ import-outside-toplevel, # common pattern in ansible related code
+ raise-missing-from, # Python 2.x does not support raise from
+ too-few-public-methods,
+ too-many-arguments,
+ too-many-branches,
+ too-many-instance-attributes,
+ too-many-lines,
+ too-many-locals,
+ too-many-nested-blocks,
+ too-many-return-statements,
+ too-many-statements,
+ unspecified-encoding, # always run with UTF-8 encoding enforced
+ useless-return, # complains about returning None when the return type is optional
+
+[BASIC]
+
+bad-names=
+ _,
+ bar,
+ baz,
+ foo,
+ tata,
+ toto,
+ tutu,
+
+good-names=
+ __metaclass__,
+ C,
+ ex,
+ i,
+ j,
+ k,
+ Run,
+
+class-attribute-rgx=[A-Za-z_][A-Za-z0-9_]{1,40}$
+attr-rgx=[a-z_][a-z0-9_]{1,40}$
+method-rgx=[a-z_][a-z0-9_]{1,40}$
+function-rgx=[a-z_][a-z0-9_]{1,40}$
+module-rgx=[a-z_][a-z0-9_-]{2,40}$
+
+[IMPORTS]
+
+preferred-modules =
+ distutils.version:ansible.module_utils.compat.version,
+
+# These modules are used by ansible-test, but will not be present in the virtual environment running pylint.
+# Listing them here makes it possible to enable the import-error check.
+ignored-modules =
+ voluptuous,
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/config/collection.cfg b/test/lib/ansible_test/_util/controller/sanity/pylint/config/collection.cfg
new file mode 100644
index 0000000..38b8d2d
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/config/collection.cfg
@@ -0,0 +1,147 @@
+[MESSAGES CONTROL]
+
+disable=
+ abstract-method,
+ access-member-before-definition,
+ arguments-differ,
+ assignment-from-no-return,
+ assignment-from-none,
+ attribute-defined-outside-init,
+ bad-indentation,
+ bad-mcs-classmethod-argument,
+ broad-except,
+ c-extension-no-member,
+ cell-var-from-loop,
+ chained-comparison,
+ comparison-with-callable,
+ consider-iterating-dictionary,
+ consider-merging-isinstance,
+ consider-using-dict-comprehension, # requires Python 2.7+, but we still require Python 2.6 support
+ consider-using-dict-items,
+ consider-using-enumerate,
+ consider-using-f-string, # Python 2.x support still required
+ consider-using-generator,
+ consider-using-get,
+ consider-using-in,
+ consider-using-set-comprehension, # requires Python 2.7+, but we still require Python 2.6 support
+ consider-using-ternary,
+ consider-using-with,
+ consider-using-max-builtin,
+ consider-using-min-builtin,
+ cyclic-import, # consistent results require running with --jobs 1 and testing all files
+ deprecated-method, # results vary by Python version
+ deprecated-module, # results vary by Python version
+ duplicate-code, # consistent results require running with --jobs 1 and testing all files
+ eval-used,
+ exec-used,
+ expression-not-assigned,
+ fixme,
+ function-redefined,
+ global-statement,
+ global-variable-undefined,
+ import-error, # inconsistent results which depend on the availability of imports
+ import-outside-toplevel, # common pattern in ansible related code
+ import-self,
+ inconsistent-return-statements,
+ invalid-envvar-default,
+ invalid-name,
+ invalid-sequence-index,
+ keyword-arg-before-vararg,
+ len-as-condition,
+ line-too-long,
+ literal-comparison,
+ locally-disabled,
+ method-hidden,
+ missing-docstring,
+ no-else-break,
+ no-else-continue,
+ no-else-raise,
+ no-else-return,
+ no-member,
+ no-name-in-module, # inconsistent results which depend on the availability of imports
+ no-value-for-parameter,
+ non-iterator-returned,
+ not-a-mapping,
+ not-an-iterable,
+ not-callable,
+ pointless-statement,
+ pointless-string-statement,
+ possibly-unused-variable,
+ protected-access,
+ raise-missing-from, # Python 2.x does not support raise from
+ redefined-argument-from-local,
+ redefined-builtin,
+ redefined-outer-name,
+ redundant-u-string-prefix, # Python 2.x support still required
+ reimported,
+ relative-beyond-top-level, # https://github.com/PyCQA/pylint/issues/2967
+ signature-differs,
+ simplifiable-if-expression,
+ simplifiable-if-statement,
+ subprocess-popen-preexec-fn,
+ super-init-not-called,
+ super-with-arguments, # Python 2.x does not support super without arguments
+ superfluous-parens,
+ too-few-public-methods,
+ too-many-ancestors, # inconsistent results between python 3.6 and 3.7+
+ too-many-arguments,
+ too-many-boolean-expressions,
+ too-many-branches,
+ too-many-function-args,
+ too-many-instance-attributes,
+ too-many-lines,
+ too-many-locals,
+ too-many-nested-blocks,
+ too-many-public-methods,
+ too-many-return-statements,
+ too-many-statements,
+ trailing-comma-tuple,
+ trailing-comma-tuple,
+ try-except-raise,
+ unbalanced-tuple-unpacking,
+ undefined-loop-variable,
+ unexpected-keyword-arg,
+ ungrouped-imports,
+ unidiomatic-typecheck,
+ unnecessary-pass,
+ unnecessary-dunder-call,
+ unsubscriptable-object,
+ unsupported-assignment-operation,
+ unsupported-delete-operation,
+ unsupported-membership-test,
+ unused-argument,
+ unused-import,
+ unused-variable,
+ unspecified-encoding, # always run with UTF-8 encoding enforced
+ use-dict-literal, # many occurrences
+ use-list-literal, # many occurrences
+ use-implicit-booleaness-not-comparison, # many occurrences
+ useless-object-inheritance,
+ useless-return,
+ useless-super-delegation,
+ useless-option-value, # provide backwards compatibility with previous versions of ansible-test
+ wrong-import-order,
+ wrong-import-position,
+
+[BASIC]
+
+bad-names=
+ _,
+ bar,
+ baz,
+ foo,
+ tata,
+ toto,
+ tutu,
+
+good-names=
+ ex,
+ i,
+ j,
+ k,
+ Run,
+
+[TYPECHECK]
+
+ignored-modules=
+ _MovedItems,
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/config/default.cfg b/test/lib/ansible_test/_util/controller/sanity/pylint/config/default.cfg
new file mode 100644
index 0000000..6a242b8
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/config/default.cfg
@@ -0,0 +1,146 @@
+[MESSAGES CONTROL]
+
+disable=
+ import-outside-toplevel, # common pattern in ansible related code
+ abstract-method,
+ access-member-before-definition,
+ arguments-differ,
+ assignment-from-no-return,
+ assignment-from-none,
+ attribute-defined-outside-init,
+ bad-indentation,
+ bad-mcs-classmethod-argument,
+ broad-except,
+ c-extension-no-member,
+ cell-var-from-loop,
+ chained-comparison,
+ comparison-with-callable,
+ consider-iterating-dictionary,
+ consider-merging-isinstance,
+ consider-using-dict-items,
+ consider-using-enumerate,
+ consider-using-f-string, # Python 2.x support still required
+ consider-using-get,
+ consider-using-in,
+ consider-using-ternary,
+ consider-using-with,
+ consider-using-max-builtin,
+ consider-using-min-builtin,
+ cyclic-import, # consistent results require running with --jobs 1 and testing all files
+ deprecated-method, # results vary by Python version
+ deprecated-module, # results vary by Python version
+ duplicate-code, # consistent results require running with --jobs 1 and testing all files
+ eval-used,
+ exec-used,
+ expression-not-assigned,
+ fixme,
+ function-redefined,
+ global-statement,
+ global-variable-undefined,
+ import-error, # inconsistent results which depend on the availability of imports
+ import-self,
+ inconsistent-return-statements,
+ invalid-envvar-default,
+ invalid-name,
+ invalid-sequence-index,
+ keyword-arg-before-vararg,
+ len-as-condition,
+ line-too-long,
+ literal-comparison,
+ locally-disabled,
+ method-hidden,
+ missing-docstring,
+ no-else-break,
+ no-else-continue,
+ no-else-raise,
+ no-else-return,
+ no-member,
+ no-name-in-module, # inconsistent results which depend on the availability of imports
+ no-value-for-parameter,
+ non-iterator-returned,
+ not-a-mapping,
+ not-an-iterable,
+ not-callable,
+ pointless-statement,
+ pointless-string-statement,
+ possibly-unused-variable,
+ protected-access,
+ raise-missing-from, # Python 2.x does not support raise from
+ redefined-argument-from-local,
+ redefined-builtin,
+ redefined-outer-name,
+ redundant-u-string-prefix, # Python 2.x support still required
+ reimported,
+ signature-differs,
+ simplifiable-if-expression,
+ simplifiable-if-statement,
+ subprocess-popen-preexec-fn,
+ super-init-not-called,
+ super-with-arguments, # Python 2.x does not support super without arguments
+ superfluous-parens,
+ too-few-public-methods,
+ too-many-ancestors, # inconsistent results between python 3.6 and 3.7+
+ too-many-arguments,
+ too-many-boolean-expressions,
+ too-many-branches,
+ too-many-function-args,
+ too-many-instance-attributes,
+ too-many-lines,
+ too-many-locals,
+ too-many-nested-blocks,
+ too-many-public-methods,
+ too-many-return-statements,
+ too-many-statements,
+ trailing-comma-tuple,
+ trailing-comma-tuple,
+ try-except-raise,
+ unbalanced-tuple-unpacking,
+ undefined-loop-variable,
+ unexpected-keyword-arg,
+ ungrouped-imports,
+ unidiomatic-typecheck,
+ unnecessary-pass,
+ unsubscriptable-object,
+ unsupported-assignment-operation,
+ unsupported-delete-operation,
+ unsupported-membership-test,
+ unused-argument,
+ unused-import,
+ unused-variable,
+ unspecified-encoding, # always run with UTF-8 encoding enforced
+ use-dict-literal, # many occurrences
+ use-list-literal, # many occurrences
+ use-implicit-booleaness-not-comparison, # many occurrences
+ useless-object-inheritance,
+ useless-return,
+ useless-super-delegation,
+ wrong-import-order,
+ wrong-import-position,
+
+[BASIC]
+
+bad-names=
+ _,
+ bar,
+ baz,
+ foo,
+ tata,
+ toto,
+ tutu,
+
+good-names=
+ ex,
+ i,
+ j,
+ k,
+ Run,
+
+[TYPECHECK]
+
+ignored-modules=
+ _MovedItems,
+
+[IMPORTS]
+
+preferred-modules =
+ distutils.version:ansible.module_utils.compat.version,
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/deprecated.py b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/deprecated.py
new file mode 100644
index 0000000..79b8bf1
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/deprecated.py
@@ -0,0 +1,263 @@
+"""Ansible specific plyint plugin for checking deprecations."""
+# (c) 2018, Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# -*- coding: utf-8 -*-
+from __future__ import annotations
+
+import datetime
+import re
+import typing as t
+
+import astroid
+
+from pylint.interfaces import IAstroidChecker
+from pylint.checkers import BaseChecker
+from pylint.checkers.utils import check_messages
+
+from ansible.module_utils.compat.version import LooseVersion
+from ansible.module_utils.six import string_types
+from ansible.release import __version__ as ansible_version_raw
+from ansible.utils.version import SemanticVersion
+
+MSGS = {
+ 'E9501': ("Deprecated version (%r) found in call to Display.deprecated "
+ "or AnsibleModule.deprecate",
+ "ansible-deprecated-version",
+ "Used when a call to Display.deprecated specifies a version "
+ "less than or equal to the current version of Ansible",
+ {'minversion': (2, 6)}),
+ 'E9502': ("Display.deprecated call without a version or date",
+ "ansible-deprecated-no-version",
+ "Used when a call to Display.deprecated does not specify a "
+ "version or date",
+ {'minversion': (2, 6)}),
+ 'E9503': ("Invalid deprecated version (%r) found in call to "
+ "Display.deprecated or AnsibleModule.deprecate",
+ "ansible-invalid-deprecated-version",
+ "Used when a call to Display.deprecated specifies an invalid "
+ "Ansible version number",
+ {'minversion': (2, 6)}),
+ 'E9504': ("Deprecated version (%r) found in call to Display.deprecated "
+ "or AnsibleModule.deprecate",
+ "collection-deprecated-version",
+ "Used when a call to Display.deprecated specifies a collection "
+ "version less than or equal to the current version of this "
+ "collection",
+ {'minversion': (2, 6)}),
+ 'E9505': ("Invalid deprecated version (%r) found in call to "
+ "Display.deprecated or AnsibleModule.deprecate",
+ "collection-invalid-deprecated-version",
+ "Used when a call to Display.deprecated specifies an invalid "
+ "collection version number",
+ {'minversion': (2, 6)}),
+ 'E9506': ("No collection name found in call to Display.deprecated or "
+ "AnsibleModule.deprecate",
+ "ansible-deprecated-no-collection-name",
+ "The current collection name in format `namespace.name` must "
+ "be provided as collection_name when calling Display.deprecated "
+ "or AnsibleModule.deprecate (`ansible.builtin` for ansible-core)",
+ {'minversion': (2, 6)}),
+ 'E9507': ("Wrong collection name (%r) found in call to "
+ "Display.deprecated or AnsibleModule.deprecate",
+ "wrong-collection-deprecated",
+ "The name of the current collection must be passed to the "
+ "Display.deprecated resp. AnsibleModule.deprecate calls "
+ "(`ansible.builtin` for ansible-core)",
+ {'minversion': (2, 6)}),
+ 'E9508': ("Expired date (%r) found in call to Display.deprecated "
+ "or AnsibleModule.deprecate",
+ "ansible-deprecated-date",
+ "Used when a call to Display.deprecated specifies a date "
+ "before today",
+ {'minversion': (2, 6)}),
+ 'E9509': ("Invalid deprecated date (%r) found in call to "
+ "Display.deprecated or AnsibleModule.deprecate",
+ "ansible-invalid-deprecated-date",
+ "Used when a call to Display.deprecated specifies an invalid "
+ "date. It must be a string in format `YYYY-MM-DD` (ISO 8601)",
+ {'minversion': (2, 6)}),
+ 'E9510': ("Both version and date found in call to "
+ "Display.deprecated or AnsibleModule.deprecate",
+ "ansible-deprecated-both-version-and-date",
+ "Only one of version and date must be specified",
+ {'minversion': (2, 6)}),
+ 'E9511': ("Removal version (%r) must be a major release, not a minor or "
+ "patch release (see the specification at https://semver.org/)",
+ "removal-version-must-be-major",
+ "Used when a call to Display.deprecated or "
+ "AnsibleModule.deprecate for a collection specifies a version "
+ "which is not of the form x.0.0",
+ {'minversion': (2, 6)}),
+}
+
+
+ANSIBLE_VERSION = LooseVersion('.'.join(ansible_version_raw.split('.')[:3]))
+
+
+def _get_expr_name(node):
+ """Funciton to get either ``attrname`` or ``name`` from ``node.func.expr``
+
+ Created specifically for the case of ``display.deprecated`` or ``self._display.deprecated``
+ """
+ try:
+ return node.func.expr.attrname
+ except AttributeError:
+ # If this fails too, we'll let it raise, the caller should catch it
+ return node.func.expr.name
+
+
+def parse_isodate(value):
+ """Parse an ISO 8601 date string."""
+ msg = 'Expected ISO 8601 date string (YYYY-MM-DD)'
+ if not isinstance(value, string_types):
+ raise ValueError(msg)
+ # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions,
+ # we have to do things manually.
+ if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', value):
+ raise ValueError(msg)
+ try:
+ return datetime.datetime.strptime(value, '%Y-%m-%d').date()
+ except ValueError:
+ raise ValueError(msg)
+
+
+class AnsibleDeprecatedChecker(BaseChecker):
+ """Checks for Display.deprecated calls to ensure that the ``version``
+ has not passed or met the time for removal
+ """
+
+ __implements__ = (IAstroidChecker,)
+ name = 'deprecated'
+ msgs = MSGS
+
+ options = (
+ ('collection-name', {
+ 'default': None,
+ 'type': 'string',
+ 'metavar': '<name>',
+ 'help': 'The collection\'s name used to check collection names in deprecations.',
+ }),
+ ('collection-version', {
+ 'default': None,
+ 'type': 'string',
+ 'metavar': '<version>',
+ 'help': 'The collection\'s version number used to check deprecations.',
+ }),
+ )
+
+ def _check_date(self, node, date):
+ if not isinstance(date, str):
+ self.add_message('ansible-invalid-deprecated-date', node=node, args=(date,))
+ return
+
+ try:
+ date_parsed = parse_isodate(date)
+ except ValueError:
+ self.add_message('ansible-invalid-deprecated-date', node=node, args=(date,))
+ return
+
+ if date_parsed < datetime.date.today():
+ self.add_message('ansible-deprecated-date', node=node, args=(date,))
+
+ def _check_version(self, node, version, collection_name):
+ if not isinstance(version, (str, float)):
+ if collection_name == 'ansible.builtin':
+ symbol = 'ansible-invalid-deprecated-version'
+ else:
+ symbol = 'collection-invalid-deprecated-version'
+ self.add_message(symbol, node=node, args=(version,))
+ return
+
+ version_no = str(version)
+
+ if collection_name == 'ansible.builtin':
+ # Ansible-base
+ try:
+ if not version_no:
+ raise ValueError('Version string should not be empty')
+ loose_version = LooseVersion(str(version_no))
+ if ANSIBLE_VERSION >= loose_version:
+ self.add_message('ansible-deprecated-version', node=node, args=(version,))
+ except ValueError:
+ self.add_message('ansible-invalid-deprecated-version', node=node, args=(version,))
+ elif collection_name:
+ # Collections
+ try:
+ if not version_no:
+ raise ValueError('Version string should not be empty')
+ semantic_version = SemanticVersion(version_no)
+ if collection_name == self.collection_name and self.collection_version is not None:
+ if self.collection_version >= semantic_version:
+ self.add_message('collection-deprecated-version', node=node, args=(version,))
+ if semantic_version.major != 0 and (semantic_version.minor != 0 or semantic_version.patch != 0):
+ self.add_message('removal-version-must-be-major', node=node, args=(version,))
+ except ValueError:
+ self.add_message('collection-invalid-deprecated-version', node=node, args=(version,))
+
+ @property
+ def collection_name(self) -> t.Optional[str]:
+ """Return the collection name, or None if ansible-core is being tested."""
+ return self.config.collection_name
+
+ @property
+ def collection_version(self) -> t.Optional[SemanticVersion]:
+ """Return the collection version, or None if ansible-core is being tested."""
+ return SemanticVersion(self.config.collection_version) if self.config.collection_version is not None else None
+
+ @check_messages(*(MSGS.keys()))
+ def visit_call(self, node):
+ """Visit a call node."""
+ version = None
+ date = None
+ collection_name = None
+ try:
+ if (node.func.attrname == 'deprecated' and 'display' in _get_expr_name(node) or
+ node.func.attrname == 'deprecate' and _get_expr_name(node)):
+ if node.keywords:
+ for keyword in node.keywords:
+ if len(node.keywords) == 1 and keyword.arg is None:
+ # This is likely a **kwargs splat
+ return
+ if keyword.arg == 'version':
+ if isinstance(keyword.value.value, astroid.Name):
+ # This is likely a variable
+ return
+ version = keyword.value.value
+ if keyword.arg == 'date':
+ if isinstance(keyword.value.value, astroid.Name):
+ # This is likely a variable
+ return
+ date = keyword.value.value
+ if keyword.arg == 'collection_name':
+ if isinstance(keyword.value.value, astroid.Name):
+ # This is likely a variable
+ return
+ collection_name = keyword.value.value
+ if not version and not date:
+ try:
+ version = node.args[1].value
+ except IndexError:
+ self.add_message('ansible-deprecated-no-version', node=node)
+ return
+ if version and date:
+ self.add_message('ansible-deprecated-both-version-and-date', node=node)
+
+ if collection_name:
+ this_collection = collection_name == (self.collection_name or 'ansible.builtin')
+ if not this_collection:
+ self.add_message('wrong-collection-deprecated', node=node, args=(collection_name,))
+ elif self.collection_name is not None:
+ self.add_message('ansible-deprecated-no-collection-name', node=node)
+
+ if date:
+ self._check_date(node, date)
+ elif version:
+ self._check_version(node, version, collection_name)
+ except AttributeError:
+ # Not the type of node we are interested in
+ pass
+
+
+def register(linter):
+ """required method to auto register this checker """
+ linter.register_checker(AnsibleDeprecatedChecker(linter))
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/string_format.py b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/string_format.py
new file mode 100644
index 0000000..934a9ae
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/string_format.py
@@ -0,0 +1,85 @@
+"""Ansible specific pylint plugin for checking format string usage."""
+# (c) 2018, Matt Martz <matt@sivel.net>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# -*- coding: utf-8 -*-
+from __future__ import annotations
+
+import astroid
+from pylint.interfaces import IAstroidChecker
+from pylint.checkers import BaseChecker
+from pylint.checkers import utils
+from pylint.checkers.utils import check_messages
+try:
+ from pylint.checkers.utils import parse_format_method_string
+except ImportError:
+ # noinspection PyUnresolvedReferences
+ from pylint.checkers.strings import parse_format_method_string
+
+MSGS = {
+ 'E9305': ("Format string contains automatic field numbering "
+ "specification",
+ "ansible-format-automatic-specification",
+ "Used when a PEP 3101 format string contains automatic "
+ "field numbering (e.g. '{}').",
+ {'minversion': (2, 6)}),
+ 'E9390': ("bytes object has no .format attribute",
+ "ansible-no-format-on-bytestring",
+ "Used when a bytestring was used as a PEP 3101 format string "
+ "as Python3 bytestrings do not have a .format attribute",
+ {'minversion': (3, 0)}),
+}
+
+
+class AnsibleStringFormatChecker(BaseChecker):
+ """Checks string formatting operations to ensure that the format string
+ is valid and the arguments match the format string.
+ """
+
+ __implements__ = (IAstroidChecker,)
+ name = 'string'
+ msgs = MSGS
+
+ @check_messages(*(MSGS.keys()))
+ def visit_call(self, node):
+ """Visit a call node."""
+ func = utils.safe_infer(node.func)
+ if (isinstance(func, astroid.BoundMethod)
+ and isinstance(func.bound, astroid.Instance)
+ and func.bound.name in ('str', 'unicode', 'bytes')):
+ if func.name == 'format':
+ self._check_new_format(node, func)
+
+ def _check_new_format(self, node, func):
+ """ Check the new string formatting """
+ if (isinstance(node.func, astroid.Attribute)
+ and not isinstance(node.func.expr, astroid.Const)):
+ return
+ try:
+ strnode = next(func.bound.infer())
+ except astroid.InferenceError:
+ return
+ if not isinstance(strnode, astroid.Const):
+ return
+
+ if isinstance(strnode.value, bytes):
+ self.add_message('ansible-no-format-on-bytestring', node=node)
+ return
+ if not isinstance(strnode.value, str):
+ return
+
+ if node.starargs or node.kwargs:
+ return
+ try:
+ num_args = parse_format_method_string(strnode.value)[1]
+ except utils.IncompleteFormatString:
+ return
+
+ if num_args:
+ self.add_message('ansible-format-automatic-specification',
+ node=node)
+ return
+
+
+def register(linter):
+ """required method to auto register this checker """
+ linter.register_checker(AnsibleStringFormatChecker(linter))
diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py
new file mode 100644
index 0000000..1be42f5
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py
@@ -0,0 +1,223 @@
+"""A plugin for pylint to identify imports and functions which should not be used."""
+from __future__ import annotations
+
+import os
+import typing as t
+
+import astroid
+
+from pylint.checkers import BaseChecker
+from pylint.interfaces import IAstroidChecker
+
+ANSIBLE_TEST_MODULES_PATH = os.environ['ANSIBLE_TEST_MODULES_PATH']
+ANSIBLE_TEST_MODULE_UTILS_PATH = os.environ['ANSIBLE_TEST_MODULE_UTILS_PATH']
+
+
+class UnwantedEntry:
+ """Defines an unwanted import."""
+ def __init__(
+ self,
+ alternative, # type: str
+ modules_only=False, # type: bool
+ names=None, # type: t.Optional[t.Tuple[str, ...]]
+ ignore_paths=None, # type: t.Optional[t.Tuple[str, ...]]
+ ansible_test_only=False, # type: bool
+ ): # type: (...) -> None
+ self.alternative = alternative
+ self.modules_only = modules_only
+ self.names = set(names) if names else set()
+ self.ignore_paths = ignore_paths
+ self.ansible_test_only = ansible_test_only
+
+ def applies_to(self, path, name=None): # type: (str, t.Optional[str]) -> bool
+ """Return True if this entry applies to the given path, otherwise return False."""
+ if self.names:
+ if not name:
+ return False
+
+ if name not in self.names:
+ return False
+
+ if self.ignore_paths and any(path.endswith(ignore_path) for ignore_path in self.ignore_paths):
+ return False
+
+ if self.ansible_test_only and '/test/lib/ansible_test/_internal/' not in path:
+ return False
+
+ if self.modules_only:
+ return is_module_path(path)
+
+ return True
+
+
+def is_module_path(path): # type: (str) -> bool
+ """Return True if the given path is a module or module_utils path, otherwise return False."""
+ return path.startswith(ANSIBLE_TEST_MODULES_PATH) or path.startswith(ANSIBLE_TEST_MODULE_UTILS_PATH)
+
+
+class AnsibleUnwantedChecker(BaseChecker):
+ """Checker for unwanted imports and functions."""
+ __implements__ = (IAstroidChecker,)
+
+ name = 'unwanted'
+
+ BAD_IMPORT = 'ansible-bad-import'
+ BAD_IMPORT_FROM = 'ansible-bad-import-from'
+ BAD_FUNCTION = 'ansible-bad-function'
+ BAD_MODULE_IMPORT = 'ansible-bad-module-import'
+
+ msgs = dict(
+ E5101=('Import %s instead of %s',
+ BAD_IMPORT,
+ 'Identifies imports which should not be used.'),
+ E5102=('Import %s from %s instead of %s',
+ BAD_IMPORT_FROM,
+ 'Identifies imports which should not be used.'),
+ E5103=('Call %s instead of %s',
+ BAD_FUNCTION,
+ 'Identifies functions which should not be used.'),
+ E5104=('Import external package or ansible.module_utils not %s',
+ BAD_MODULE_IMPORT,
+ 'Identifies imports which should not be used.'),
+ )
+
+ unwanted_imports = dict(
+ # Additional imports that we may want to start checking:
+ # boto=UnwantedEntry('boto3', modules_only=True),
+ # requests=UnwantedEntry('ansible.module_utils.urls', modules_only=True),
+ # urllib=UnwantedEntry('ansible.module_utils.urls', modules_only=True),
+
+ # see https://docs.python.org/2/library/urllib2.html
+ urllib2=UnwantedEntry('ansible.module_utils.urls',
+ ignore_paths=(
+ '/lib/ansible/module_utils/urls.py',
+ )),
+
+ # see https://docs.python.org/3/library/collections.abc.html
+ collections=UnwantedEntry('ansible.module_utils.common._collections_compat',
+ ignore_paths=(
+ '/lib/ansible/module_utils/common/_collections_compat.py',
+ ),
+ names=(
+ 'MappingView',
+ 'ItemsView',
+ 'KeysView',
+ 'ValuesView',
+ 'Mapping', 'MutableMapping',
+ 'Sequence', 'MutableSequence',
+ 'Set', 'MutableSet',
+ 'Container',
+ 'Hashable',
+ 'Sized',
+ 'Callable',
+ 'Iterable',
+ 'Iterator',
+ )),
+ )
+
+ unwanted_functions = {
+ # see https://docs.python.org/3/library/tempfile.html#tempfile.mktemp
+ 'tempfile.mktemp': UnwantedEntry('tempfile.mkstemp'),
+
+ # os.chmod resolves as posix.chmod
+ 'posix.chmod': UnwantedEntry('verified_chmod',
+ ansible_test_only=True),
+
+ 'sys.exit': UnwantedEntry('exit_json or fail_json',
+ ignore_paths=(
+ '/lib/ansible/module_utils/basic.py',
+ '/lib/ansible/modules/async_wrapper.py',
+ ),
+ modules_only=True),
+
+ 'builtins.print': UnwantedEntry('module.log or module.debug',
+ ignore_paths=(
+ '/lib/ansible/module_utils/basic.py',
+ ),
+ modules_only=True),
+ }
+
+ def visit_import(self, node): # type: (astroid.node_classes.Import) -> None
+ """Visit an import node."""
+ for name in node.names:
+ self._check_import(node, name[0])
+
+ def visit_importfrom(self, node): # type: (astroid.node_classes.ImportFrom) -> None
+ """Visit an import from node."""
+ self._check_importfrom(node, node.modname, node.names)
+
+ def visit_attribute(self, node): # type: (astroid.node_classes.Attribute) -> None
+ """Visit an attribute node."""
+ last_child = node.last_child()
+
+ # this is faster than using type inference and will catch the most common cases
+ if not isinstance(last_child, astroid.node_classes.Name):
+ return
+
+ module = last_child.name
+
+ entry = self.unwanted_imports.get(module)
+
+ if entry and entry.names:
+ if entry.applies_to(self.linter.current_file, node.attrname):
+ self.add_message(self.BAD_IMPORT_FROM, args=(node.attrname, entry.alternative, module), node=node)
+
+ def visit_call(self, node): # type: (astroid.node_classes.Call) -> None
+ """Visit a call node."""
+ try:
+ for i in node.func.inferred():
+ func = None
+
+ if isinstance(i, astroid.scoped_nodes.FunctionDef) and isinstance(i.parent, astroid.scoped_nodes.Module):
+ func = '%s.%s' % (i.parent.name, i.name)
+
+ if not func:
+ continue
+
+ entry = self.unwanted_functions.get(func)
+
+ if entry and entry.applies_to(self.linter.current_file):
+ self.add_message(self.BAD_FUNCTION, args=(entry.alternative, func), node=node)
+ except astroid.exceptions.InferenceError:
+ pass
+
+ def _check_import(self, node, modname): # type: (astroid.node_classes.Import, str) -> None
+ """Check the imports on the specified import node."""
+ self._check_module_import(node, modname)
+
+ entry = self.unwanted_imports.get(modname)
+
+ if not entry:
+ return
+
+ if entry.applies_to(self.linter.current_file):
+ self.add_message(self.BAD_IMPORT, args=(entry.alternative, modname), node=node)
+
+ def _check_importfrom(self, node, modname, names): # type: (astroid.node_classes.ImportFrom, str, t.List[str]) -> None
+ """Check the imports on the specified import from node."""
+ self._check_module_import(node, modname)
+
+ entry = self.unwanted_imports.get(modname)
+
+ if not entry:
+ return
+
+ for name in names:
+ if entry.applies_to(self.linter.current_file, name[0]):
+ self.add_message(self.BAD_IMPORT_FROM, args=(name[0], entry.alternative, modname), node=node)
+
+ def _check_module_import(self, node, modname): # type: (t.Union[astroid.node_classes.Import, astroid.node_classes.ImportFrom], str) -> None
+ """Check the module import on the given import or import from node."""
+ if not is_module_path(self.linter.current_file):
+ return
+
+ if modname == 'ansible.module_utils' or modname.startswith('ansible.module_utils.'):
+ return
+
+ if modname == 'ansible' or modname.startswith('ansible.'):
+ self.add_message(self.BAD_MODULE_IMPORT, args=(modname,), node=node)
+
+
+def register(linter):
+ """required method to auto register this checker """
+ linter.register_checker(AnsibleUnwantedChecker(linter))
diff --git a/test/lib/ansible_test/_util/controller/sanity/shellcheck/exclude.txt b/test/lib/ansible_test/_util/controller/sanity/shellcheck/exclude.txt
new file mode 100644
index 0000000..29588dd
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/shellcheck/exclude.txt
@@ -0,0 +1,3 @@
+SC1090
+SC1091
+SC2164
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate.py
new file mode 100644
index 0000000..ee7e832
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate.py
@@ -0,0 +1,6 @@
+from __future__ import annotations
+
+from validate_modules.main import main
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/__init__.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/__init__.py
new file mode 100644
index 0000000..1cfd6ac
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 Matt Martz <matt@sivel.net>
+# Copyright (C) 2015 Rackspace US, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import annotations
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
new file mode 100644
index 0000000..270c9f4
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
@@ -0,0 +1,2520 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 Matt Martz <matt@sivel.net>
+# Copyright (C) 2015 Rackspace US, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import annotations
+
+import abc
+import argparse
+import ast
+import datetime
+import json
+import os
+import re
+import subprocess
+import sys
+import tempfile
+import traceback
+import warnings
+
+from collections import OrderedDict
+from collections.abc import Mapping
+from contextlib import contextmanager
+from fnmatch import fnmatch
+
+import yaml
+
+from voluptuous.humanize import humanize_error
+
+
+def setup_collection_loader():
+ """
+ Configure the collection loader if a collection is being tested.
+ This must be done before the plugin loader is imported.
+ """
+ if '--collection' not in sys.argv:
+ return
+
+ # noinspection PyProtectedMember
+ from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder
+
+ collections_paths = os.environ.get('ANSIBLE_COLLECTIONS_PATH', '').split(os.pathsep)
+ collection_loader = _AnsibleCollectionFinder(collections_paths)
+ # noinspection PyProtectedMember
+ collection_loader._install() # pylint: disable=protected-access
+
+ warnings.filterwarnings(
+ "ignore",
+ "AnsibleCollectionFinder has already been configured")
+
+
+setup_collection_loader()
+
+from ansible import __version__ as ansible_version
+from ansible.executor.module_common import REPLACER_WINDOWS, NEW_STYLE_PYTHON_MODULE_RE
+from ansible.module_utils.common.parameters import DEFAULT_TYPE_VALIDATORS
+from ansible.module_utils.compat.version import StrictVersion, LooseVersion
+from ansible.module_utils.basic import to_bytes
+from ansible.module_utils.six import PY3, with_metaclass, string_types
+from ansible.plugins.loader import fragment_loader
+from ansible.plugins.list import IGNORE as REJECTLIST
+from ansible.utils.plugin_docs import add_collection_to_versions_and_dates, add_fragments, get_docstring
+from ansible.utils.version import SemanticVersion
+
+from .module_args import AnsibleModuleImportError, AnsibleModuleNotInitialized, get_argument_spec
+
+from .schema import ansible_module_kwargs_schema, doc_schema, return_schema
+
+from .utils import CaptureStd, NoArgsAnsibleModule, compare_unordered_lists, is_empty, parse_yaml, parse_isodate
+
+
+if PY3:
+ # Because there is no ast.TryExcept in Python 3 ast module
+ TRY_EXCEPT = ast.Try
+ # REPLACER_WINDOWS from ansible.executor.module_common is byte
+ # string but we need unicode for Python 3
+ REPLACER_WINDOWS = REPLACER_WINDOWS.decode('utf-8')
+else:
+ TRY_EXCEPT = ast.TryExcept
+
+REJECTLIST_DIRS = frozenset(('.git', 'test', '.github', '.idea'))
+INDENT_REGEX = re.compile(r'([\t]*)')
+TYPE_REGEX = re.compile(r'.*(if|or)(\s+[^"\']*|\s+)(?<!_)(?<!str\()type\([^)].*')
+SYS_EXIT_REGEX = re.compile(r'[^#]*sys.exit\s*\(.*')
+NO_LOG_REGEX = re.compile(r'(?:pass(?!ive)|secret|token|key)', re.I)
+
+
+REJECTLIST_IMPORTS = {
+ 'requests': {
+ 'new_only': True,
+ 'error': {
+ 'code': 'use-module-utils-urls',
+ 'msg': ('requests import found, should use '
+ 'ansible.module_utils.urls instead')
+ }
+ },
+ r'boto(?:\.|$)': {
+ 'new_only': True,
+ 'error': {
+ 'code': 'use-boto3',
+ 'msg': 'boto import found, new modules should use boto3'
+ }
+ },
+}
+SUBPROCESS_REGEX = re.compile(r'subprocess\.Po.*')
+OS_CALL_REGEX = re.compile(r'os\.call.*')
+
+
+LOOSE_ANSIBLE_VERSION = LooseVersion('.'.join(ansible_version.split('.')[:3]))
+
+
+PLUGINS_WITH_RETURN_VALUES = ('module', )
+PLUGINS_WITH_EXAMPLES = ('module', )
+PLUGINS_WITH_YAML_EXAMPLES = ('module', )
+
+
+def is_potential_secret_option(option_name):
+ if not NO_LOG_REGEX.search(option_name):
+ return False
+ # If this is a count, type, algorithm, timeout, filename, or name, it is probably not a secret
+ if option_name.endswith((
+ '_count', '_type', '_alg', '_algorithm', '_timeout', '_name', '_comment',
+ '_bits', '_id', '_identifier', '_period', '_file', '_filename',
+ )):
+ return False
+ # 'key' also matches 'publickey', which is generally not secret
+ if any(part in option_name for part in (
+ 'publickey', 'public_key', 'keyusage', 'key_usage', 'keyserver', 'key_server',
+ 'keysize', 'key_size', 'keyservice', 'key_service', 'pub_key', 'pubkey',
+ 'keyboard', 'secretary',
+ )):
+ return False
+ return True
+
+
+def compare_dates(d1, d2):
+ try:
+ date1 = parse_isodate(d1, allow_date=True)
+ date2 = parse_isodate(d2, allow_date=True)
+ return date1 == date2
+ except ValueError:
+ # At least one of d1 and d2 cannot be parsed. Simply compare values.
+ return d1 == d2
+
+
+class ReporterEncoder(json.JSONEncoder):
+ def default(self, o):
+ if isinstance(o, Exception):
+ return str(o)
+
+ return json.JSONEncoder.default(self, o)
+
+
+class Reporter:
+ def __init__(self):
+ self.files = OrderedDict()
+
+ def _ensure_default_entry(self, path):
+ try:
+ self.files[path]
+ except KeyError:
+ self.files[path] = {
+ 'errors': [],
+ 'warnings': [],
+ 'traces': [],
+ 'warning_traces': []
+ }
+
+ def _log(self, path, code, msg, level='error', line=0, column=0):
+ self._ensure_default_entry(path)
+ lvl_dct = self.files[path]['%ss' % level]
+ lvl_dct.append({
+ 'code': code,
+ 'msg': msg,
+ 'line': line,
+ 'column': column
+ })
+
+ def error(self, *args, **kwargs):
+ self._log(*args, level='error', **kwargs)
+
+ def warning(self, *args, **kwargs):
+ self._log(*args, level='warning', **kwargs)
+
+ def trace(self, path, tracebk):
+ self._ensure_default_entry(path)
+ self.files[path]['traces'].append(tracebk)
+
+ def warning_trace(self, path, tracebk):
+ self._ensure_default_entry(path)
+ self.files[path]['warning_traces'].append(tracebk)
+
+ @staticmethod
+ @contextmanager
+ def _output_handle(output):
+ if output != '-':
+ handle = open(output, 'w+')
+ else:
+ handle = sys.stdout
+
+ yield handle
+
+ handle.flush()
+ handle.close()
+
+ @staticmethod
+ def _filter_out_ok(reports):
+ temp_reports = OrderedDict()
+ for path, report in reports.items():
+ if report['errors'] or report['warnings']:
+ temp_reports[path] = report
+
+ return temp_reports
+
+ def plain(self, warnings=False, output='-'):
+ """Print out the test results in plain format
+
+ output is ignored here for now
+ """
+ ret = []
+
+ for path, report in Reporter._filter_out_ok(self.files).items():
+ traces = report['traces'][:]
+ if warnings and report['warnings']:
+ traces.extend(report['warning_traces'])
+
+ for trace in traces:
+ print('TRACE:')
+ print('\n '.join((' %s' % trace).splitlines()))
+ for error in report['errors']:
+ error['path'] = path
+ print('%(path)s:%(line)d:%(column)d: E%(code)s %(msg)s' % error)
+ ret.append(1)
+ if warnings:
+ for warning in report['warnings']:
+ warning['path'] = path
+ print('%(path)s:%(line)d:%(column)d: W%(code)s %(msg)s' % warning)
+
+ return 3 if ret else 0
+
+ def json(self, warnings=False, output='-'):
+ """Print out the test results in json format
+
+ warnings is not respected in this output
+ """
+ ret = [len(r['errors']) for r in self.files.values()]
+
+ with Reporter._output_handle(output) as handle:
+ print(json.dumps(Reporter._filter_out_ok(self.files), indent=4, cls=ReporterEncoder), file=handle)
+
+ return 3 if sum(ret) else 0
+
+
+class Validator(with_metaclass(abc.ABCMeta, object)):
+ """Validator instances are intended to be run on a single object. if you
+ are scanning multiple objects for problems, you'll want to have a separate
+ Validator for each one."""
+
+ def __init__(self, reporter=None):
+ self.reporter = reporter
+
+ @property
+ @abc.abstractmethod
+ def object_name(self):
+ """Name of the object we validated"""
+ pass
+
+ @property
+ @abc.abstractmethod
+ def object_path(self):
+ """Path of the object we validated"""
+ pass
+
+ @abc.abstractmethod
+ def validate(self):
+ """Run this method to generate the test results"""
+ pass
+
+
+class ModuleValidator(Validator):
+ REJECTLIST_PATTERNS = ('.git*', '*.pyc', '*.pyo', '.*', '*.md', '*.rst', '*.txt')
+ REJECTLIST_FILES = frozenset(('.git', '.gitignore', '.travis.yml',
+ '.gitattributes', '.gitmodules', 'COPYING',
+ '__init__.py', 'VERSION', 'test-docs.sh'))
+ REJECTLIST = REJECTLIST_FILES.union(REJECTLIST['module'])
+
+ # win_dsc is a dynamic arg spec, the docs won't ever match
+ PS_ARG_VALIDATE_REJECTLIST = frozenset(('win_dsc.ps1', ))
+
+ ACCEPTLIST_FUTURE_IMPORTS = frozenset(('absolute_import', 'division', 'print_function'))
+
+ def __init__(self, path, analyze_arg_spec=False, collection=None, collection_version=None,
+ base_branch=None, git_cache=None, reporter=None, routing=None, plugin_type='module'):
+ super(ModuleValidator, self).__init__(reporter=reporter or Reporter())
+
+ self.path = path
+ self.basename = os.path.basename(self.path)
+ self.name = os.path.splitext(self.basename)[0]
+ self.plugin_type = plugin_type
+
+ self.analyze_arg_spec = analyze_arg_spec and plugin_type == 'module'
+
+ self._Version = LooseVersion
+ self._StrictVersion = StrictVersion
+
+ self.collection = collection
+ self.collection_name = 'ansible.builtin'
+ if self.collection:
+ self._Version = SemanticVersion
+ self._StrictVersion = SemanticVersion
+ collection_namespace_path, collection_name = os.path.split(self.collection)
+ self.collection_name = '%s.%s' % (os.path.basename(collection_namespace_path), collection_name)
+ self.routing = routing
+ self.collection_version = None
+ if collection_version is not None:
+ self.collection_version_str = collection_version
+ self.collection_version = SemanticVersion(collection_version)
+
+ self.base_branch = base_branch
+ self.git_cache = git_cache or GitCache()
+
+ self._python_module_override = False
+
+ with open(path) as f:
+ self.text = f.read()
+ self.length = len(self.text.splitlines())
+ try:
+ self.ast = ast.parse(self.text)
+ except Exception:
+ self.ast = None
+
+ if base_branch:
+ self.base_module = self._get_base_file()
+ else:
+ self.base_module = None
+
+ def _create_version(self, v, collection_name=None):
+ if not v:
+ raise ValueError('Empty string is not a valid version')
+ if collection_name == 'ansible.builtin':
+ return LooseVersion(v)
+ if collection_name is not None:
+ return SemanticVersion(v)
+ return self._Version(v)
+
+ def _create_strict_version(self, v, collection_name=None):
+ if not v:
+ raise ValueError('Empty string is not a valid version')
+ if collection_name == 'ansible.builtin':
+ return StrictVersion(v)
+ if collection_name is not None:
+ return SemanticVersion(v)
+ return self._StrictVersion(v)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if not self.base_module:
+ return
+
+ try:
+ os.remove(self.base_module)
+ except Exception:
+ pass
+
+ @property
+ def object_name(self):
+ return self.basename
+
+ @property
+ def object_path(self):
+ return self.path
+
+ def _get_collection_meta(self):
+ """Implement if we need this for version_added comparisons
+ """
+ pass
+
+ def _python_module(self):
+ if self.path.endswith('.py') or self._python_module_override:
+ return True
+ return False
+
+ def _powershell_module(self):
+ if self.path.endswith('.ps1'):
+ return True
+ return False
+
+ def _sidecar_doc(self):
+ if self.path.endswith('.yml') or self.path.endswith('.yaml'):
+ return True
+ return False
+
+ def _just_docs(self):
+ """Module can contain just docs and from __future__ boilerplate
+ """
+ try:
+ for child in self.ast.body:
+ if not isinstance(child, ast.Assign):
+ # allow string constant expressions (these are docstrings)
+ if isinstance(child, ast.Expr) and isinstance(child.value, ast.Constant) and isinstance(child.value.value, str):
+ continue
+
+ # allowed from __future__ imports
+ if isinstance(child, ast.ImportFrom) and child.module == '__future__':
+ for future_import in child.names:
+ if future_import.name not in self.ACCEPTLIST_FUTURE_IMPORTS:
+ break
+ else:
+ continue
+ return False
+ return True
+ except AttributeError:
+ return False
+
+ def _get_base_branch_module_path(self):
+ """List all paths within lib/ansible/modules to try and match a moved module"""
+ return self.git_cache.base_module_paths.get(self.object_name)
+
+ def _has_alias(self):
+ """Return true if the module has any aliases."""
+ return self.object_name in self.git_cache.head_aliased_modules
+
+ def _get_base_file(self):
+ # In case of module moves, look for the original location
+ base_path = self._get_base_branch_module_path()
+ ext = os.path.splitext(base_path or self.path)[1]
+
+ command = ['git', 'show', '%s:%s' % (self.base_branch, base_path or self.path)]
+ p = subprocess.run(command, stdin=subprocess.DEVNULL, capture_output=True, check=False)
+
+ if int(p.returncode) != 0:
+ return None
+
+ t = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
+ t.write(p.stdout)
+ t.close()
+
+ return t.name
+
+ def _is_new_module(self):
+ if self._has_alias():
+ return False
+
+ return not self.object_name.startswith('_') and bool(self.base_branch) and not bool(self.base_module)
+
+ def _check_interpreter(self, powershell=False):
+ if powershell:
+ if not self.text.startswith('#!powershell\n'):
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-powershell-interpreter',
+ msg='Interpreter line is not "#!powershell"'
+ )
+ return
+
+ missing_python_interpreter = False
+
+ if not self.text.startswith('#!/usr/bin/python'):
+ if NEW_STYLE_PYTHON_MODULE_RE.search(to_bytes(self.text)):
+ missing_python_interpreter = self.text.startswith('#!') # shebang optional, but if present must match
+ else:
+ missing_python_interpreter = True # shebang required
+
+ if missing_python_interpreter:
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-python-interpreter',
+ msg='Interpreter line is not "#!/usr/bin/python"',
+ )
+
+ def _check_type_instead_of_isinstance(self, powershell=False):
+ if powershell:
+ return
+ for line_no, line in enumerate(self.text.splitlines()):
+ typekeyword = TYPE_REGEX.match(line)
+ if typekeyword:
+ # TODO: add column
+ self.reporter.error(
+ path=self.object_path,
+ code='unidiomatic-typecheck',
+ msg=('Type comparison using type() found. '
+ 'Use isinstance() instead'),
+ line=line_no + 1
+ )
+
+ def _check_for_sys_exit(self):
+ # Optimize out the happy path
+ if 'sys.exit' not in self.text:
+ return
+
+ for line_no, line in enumerate(self.text.splitlines()):
+ sys_exit_usage = SYS_EXIT_REGEX.match(line)
+ if sys_exit_usage:
+ # TODO: add column
+ self.reporter.error(
+ path=self.object_path,
+ code='use-fail-json-not-sys-exit',
+ msg='sys.exit() call found. Should be exit_json/fail_json',
+ line=line_no + 1
+ )
+
+ def _check_gpl3_header(self):
+ header = '\n'.join(self.text.split('\n')[:20])
+ if ('GNU General Public License' not in header or
+ ('version 3' not in header and 'v3.0' not in header)):
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-gplv3-license',
+ msg='GPLv3 license header not found in the first 20 lines of the module'
+ )
+ elif self._is_new_module():
+ if len([line for line in header
+ if 'GNU General Public License' in line]) > 1:
+ self.reporter.error(
+ path=self.object_path,
+ code='use-short-gplv3-license',
+ msg='Found old style GPLv3 license header: '
+ 'https://docs.ansible.com/ansible-core/devel/dev_guide/developing_modules_documenting.html#copyright'
+ )
+
+ def _check_for_subprocess(self):
+ for child in self.ast.body:
+ if isinstance(child, ast.Import):
+ if child.names[0].name == 'subprocess':
+ for line_no, line in enumerate(self.text.splitlines()):
+ sp_match = SUBPROCESS_REGEX.search(line)
+ if sp_match:
+ self.reporter.error(
+ path=self.object_path,
+ code='use-run-command-not-popen',
+ msg=('subprocess.Popen call found. Should be module.run_command'),
+ line=(line_no + 1),
+ column=(sp_match.span()[0] + 1)
+ )
+
+ def _check_for_os_call(self):
+ if 'os.call' in self.text:
+ for line_no, line in enumerate(self.text.splitlines()):
+ os_call_match = OS_CALL_REGEX.search(line)
+ if os_call_match:
+ self.reporter.error(
+ path=self.object_path,
+ code='use-run-command-not-os-call',
+ msg=('os.call() call found. Should be module.run_command'),
+ line=(line_no + 1),
+ column=(os_call_match.span()[0] + 1)
+ )
+
+ def _find_rejectlist_imports(self):
+ for child in self.ast.body:
+ names = []
+ if isinstance(child, ast.Import):
+ names.extend(child.names)
+ elif isinstance(child, TRY_EXCEPT):
+ bodies = child.body
+ for handler in child.handlers:
+ bodies.extend(handler.body)
+ for grandchild in bodies:
+ if isinstance(grandchild, ast.Import):
+ names.extend(grandchild.names)
+ for name in names:
+ # TODO: Add line/col
+ for rejectlist_import, options in REJECTLIST_IMPORTS.items():
+ if re.search(rejectlist_import, name.name):
+ new_only = options['new_only']
+ if self._is_new_module() and new_only:
+ self.reporter.error(
+ path=self.object_path,
+ **options['error']
+ )
+ elif not new_only:
+ self.reporter.error(
+ path=self.object_path,
+ **options['error']
+ )
+
+ def _find_module_utils(self):
+ linenos = []
+ found_basic = False
+ for child in self.ast.body:
+ if isinstance(child, (ast.Import, ast.ImportFrom)):
+ names = []
+ try:
+ names.append(child.module)
+ if child.module.endswith('.basic'):
+ found_basic = True
+ except AttributeError:
+ pass
+ names.extend([n.name for n in child.names])
+
+ if [n for n in names if n.startswith('ansible.module_utils')]:
+ linenos.append(child.lineno)
+
+ for name in child.names:
+ if ('module_utils' in getattr(child, 'module', '') and
+ isinstance(name, ast.alias) and
+ name.name == '*'):
+ msg = (
+ 'module-utils-specific-import',
+ ('module_utils imports should import specific '
+ 'components, not "*"')
+ )
+ if self._is_new_module():
+ self.reporter.error(
+ path=self.object_path,
+ code=msg[0],
+ msg=msg[1],
+ line=child.lineno
+ )
+ else:
+ self.reporter.warning(
+ path=self.object_path,
+ code=msg[0],
+ msg=msg[1],
+ line=child.lineno
+ )
+
+ if (isinstance(name, ast.alias) and
+ name.name == 'basic'):
+ found_basic = True
+
+ if not found_basic:
+ self.reporter.warning(
+ path=self.object_path,
+ code='missing-module-utils-basic-import',
+ msg='Did not find "ansible.module_utils.basic" import'
+ )
+
+ return linenos
+
+ def _get_first_callable(self):
+ linenos = []
+ for child in self.ast.body:
+ if isinstance(child, (ast.FunctionDef, ast.ClassDef)):
+ linenos.append(child.lineno)
+
+ return min(linenos) if linenos else None
+
+ def _find_has_import(self):
+ for child in self.ast.body:
+ found_try_except_import = False
+ found_has = False
+ if isinstance(child, TRY_EXCEPT):
+ bodies = child.body
+ for handler in child.handlers:
+ bodies.extend(handler.body)
+ for grandchild in bodies:
+ if isinstance(grandchild, ast.Import):
+ found_try_except_import = True
+ if isinstance(grandchild, ast.Assign):
+ for target in grandchild.targets:
+ if not isinstance(target, ast.Name):
+ continue
+ if target.id.lower().startswith('has_'):
+ found_has = True
+ if found_try_except_import and not found_has:
+ # TODO: Add line/col
+ self.reporter.warning(
+ path=self.object_path,
+ code='try-except-missing-has',
+ msg='Found Try/Except block without HAS_ assignment'
+ )
+
+ def _ensure_imports_below_docs(self, doc_info, first_callable):
+ min_doc_line = min(doc_info[key]['lineno'] for key in doc_info)
+ max_doc_line = max(doc_info[key]['end_lineno'] for key in doc_info)
+
+ import_lines = []
+
+ for child in self.ast.body:
+ if isinstance(child, (ast.Import, ast.ImportFrom)):
+ if isinstance(child, ast.ImportFrom) and child.module == '__future__':
+ # allowed from __future__ imports
+ for future_import in child.names:
+ if future_import.name not in self.ACCEPTLIST_FUTURE_IMPORTS:
+ self.reporter.error(
+ path=self.object_path,
+ code='illegal-future-imports',
+ msg=('Only the following from __future__ imports are allowed: %s'
+ % ', '.join(self.ACCEPTLIST_FUTURE_IMPORTS)),
+ line=child.lineno
+ )
+ break
+ else: # for-else. If we didn't find a problem nad break out of the loop, then this is a legal import
+ continue
+ import_lines.append(child.lineno)
+ if child.lineno < min_doc_line:
+ self.reporter.error(
+ path=self.object_path,
+ code='import-before-documentation',
+ msg=('Import found before documentation variables. '
+ 'All imports must appear below '
+ 'DOCUMENTATION/EXAMPLES/RETURN.'),
+ line=child.lineno
+ )
+ break
+ elif isinstance(child, TRY_EXCEPT):
+ bodies = child.body
+ for handler in child.handlers:
+ bodies.extend(handler.body)
+ for grandchild in bodies:
+ if isinstance(grandchild, (ast.Import, ast.ImportFrom)):
+ import_lines.append(grandchild.lineno)
+ if grandchild.lineno < min_doc_line:
+ self.reporter.error(
+ path=self.object_path,
+ code='import-before-documentation',
+ msg=('Import found before documentation '
+ 'variables. All imports must appear below '
+ 'DOCUMENTATION/EXAMPLES/RETURN.'),
+ line=child.lineno
+ )
+ break
+
+ for import_line in import_lines:
+ if not (max_doc_line < import_line < first_callable):
+ msg = (
+ 'import-placement',
+ ('Imports should be directly below DOCUMENTATION/EXAMPLES/'
+ 'RETURN.')
+ )
+ if self._is_new_module():
+ self.reporter.error(
+ path=self.object_path,
+ code=msg[0],
+ msg=msg[1],
+ line=import_line
+ )
+ else:
+ self.reporter.warning(
+ path=self.object_path,
+ code=msg[0],
+ msg=msg[1],
+ line=import_line
+ )
+
+ def _validate_ps_replacers(self):
+ # loop all (for/else + error)
+ # get module list for each
+ # check "shape" of each module name
+
+ module_requires = r'(?im)^#\s*requires\s+\-module(?:s?)\s*(Ansible\.ModuleUtils\..+)'
+ csharp_requires = r'(?im)^#\s*ansiblerequires\s+\-csharputil\s*(Ansible\..+)'
+ found_requires = False
+
+ for req_stmt in re.finditer(module_requires, self.text):
+ found_requires = True
+ # this will bomb on dictionary format - "don't do that"
+ module_list = [x.strip() for x in req_stmt.group(1).split(',')]
+ if len(module_list) > 1:
+ self.reporter.error(
+ path=self.object_path,
+ code='multiple-utils-per-requires',
+ msg='Ansible.ModuleUtils requirements do not support multiple modules per statement: "%s"' % req_stmt.group(0)
+ )
+ continue
+
+ module_name = module_list[0]
+
+ if module_name.lower().endswith('.psm1'):
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-requires-extension',
+ msg='Module #Requires should not end in .psm1: "%s"' % module_name
+ )
+
+ for req_stmt in re.finditer(csharp_requires, self.text):
+ found_requires = True
+ # this will bomb on dictionary format - "don't do that"
+ module_list = [x.strip() for x in req_stmt.group(1).split(',')]
+ if len(module_list) > 1:
+ self.reporter.error(
+ path=self.object_path,
+ code='multiple-csharp-utils-per-requires',
+ msg='Ansible C# util requirements do not support multiple utils per statement: "%s"' % req_stmt.group(0)
+ )
+ continue
+
+ module_name = module_list[0]
+
+ if module_name.lower().endswith('.cs'):
+ self.reporter.error(
+ path=self.object_path,
+ code='illegal-extension-cs',
+ msg='Module #AnsibleRequires -CSharpUtil should not end in .cs: "%s"' % module_name
+ )
+
+ # also accept the legacy #POWERSHELL_COMMON replacer signal
+ if not found_requires and REPLACER_WINDOWS not in self.text:
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-module-utils-import-csharp-requirements',
+ msg='No Ansible.ModuleUtils or C# Ansible util requirements/imports found'
+ )
+
+ def _find_ps_docs_file(self):
+ sidecar = self._find_sidecar_docs()
+ if sidecar:
+ return sidecar
+
+ py_path = self.path.replace('.ps1', '.py')
+ if not os.path.isfile(py_path):
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-documentation',
+ msg='No DOCUMENTATION provided'
+ )
+ return py_path
+
+ def _find_sidecar_docs(self):
+ base_path = os.path.splitext(self.path)[0]
+ for ext in ('.yml', '.yaml'):
+ doc_path = f"{base_path}{ext}"
+ if os.path.isfile(doc_path):
+ return doc_path
+
+ def _get_py_docs(self):
+ docs = {
+ 'DOCUMENTATION': {
+ 'value': None,
+ 'lineno': 0,
+ 'end_lineno': 0,
+ },
+ 'EXAMPLES': {
+ 'value': None,
+ 'lineno': 0,
+ 'end_lineno': 0,
+ },
+ 'RETURN': {
+ 'value': None,
+ 'lineno': 0,
+ 'end_lineno': 0,
+ },
+ }
+ for child in self.ast.body:
+ if isinstance(child, ast.Assign):
+ for grandchild in child.targets:
+ if not isinstance(grandchild, ast.Name):
+ continue
+
+ if grandchild.id == 'DOCUMENTATION':
+ docs['DOCUMENTATION']['value'] = child.value.s
+ docs['DOCUMENTATION']['lineno'] = child.lineno
+ docs['DOCUMENTATION']['end_lineno'] = (
+ child.lineno + len(child.value.s.splitlines())
+ )
+ elif grandchild.id == 'EXAMPLES':
+ docs['EXAMPLES']['value'] = child.value.s
+ docs['EXAMPLES']['lineno'] = child.lineno
+ docs['EXAMPLES']['end_lineno'] = (
+ child.lineno + len(child.value.s.splitlines())
+ )
+ elif grandchild.id == 'RETURN':
+ docs['RETURN']['value'] = child.value.s
+ docs['RETURN']['lineno'] = child.lineno
+ docs['RETURN']['end_lineno'] = (
+ child.lineno + len(child.value.s.splitlines())
+ )
+
+ return docs
+
+ def _validate_docs_schema(self, doc, schema, name, error_code):
+ # TODO: Add line/col
+ errors = []
+ try:
+ schema(doc)
+ except Exception as e:
+ for error in e.errors:
+ error.data = doc
+ errors.extend(e.errors)
+
+ for error in errors:
+ path = [str(p) for p in error.path]
+
+ local_error_code = getattr(error, 'ansible_error_code', error_code)
+
+ if isinstance(error.data, dict):
+ error_message = humanize_error(error.data, error)
+ else:
+ error_message = error
+
+ if path:
+ combined_path = '%s.%s' % (name, '.'.join(path))
+ else:
+ combined_path = name
+
+ self.reporter.error(
+ path=self.object_path,
+ code=local_error_code,
+ msg='%s: %s' % (combined_path, error_message)
+ )
+
+ def _validate_docs(self):
+ doc = None
+ # We have three ways of marking deprecated/removed files. Have to check each one
+ # individually and then make sure they all agree
+ filename_deprecated_or_removed = False
+ deprecated = False
+ doc_deprecated = None # doc legally might not exist
+ routing_says_deprecated = False
+
+ if self.object_name.startswith('_') and not os.path.islink(self.object_path):
+ filename_deprecated_or_removed = True
+
+ # We are testing a collection
+ if self.routing:
+ routing_deprecation = self.routing.get('plugin_routing', {})
+ routing_deprecation = routing_deprecation.get('modules' if self.plugin_type == 'module' else self.plugin_type, {})
+ routing_deprecation = routing_deprecation.get(self.name, {}).get('deprecation', {})
+ if routing_deprecation:
+ # meta/runtime.yml says this is deprecated
+ routing_says_deprecated = True
+ deprecated = True
+
+ if self._python_module():
+ doc_info = self._get_py_docs()
+ else:
+ doc_info = None
+
+ sidecar_text = None
+ if self._sidecar_doc():
+ sidecar_text = self.text
+ elif sidecar_path := self._find_sidecar_docs():
+ with open(sidecar_path, mode='r', encoding='utf-8') as fd:
+ sidecar_text = fd.read()
+
+ if sidecar_text:
+ sidecar_doc, errors, traces = parse_yaml(sidecar_text, 0, self.name, 'DOCUMENTATION')
+ for error in errors:
+ self.reporter.error(
+ path=self.object_path,
+ code='documentation-syntax-error',
+ **error
+ )
+ for trace in traces:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=trace
+ )
+
+ doc = sidecar_doc.get('DOCUMENTATION', None)
+ examples_raw = sidecar_doc.get('EXAMPLES', None)
+ examples_lineno = 1
+ returns = sidecar_doc.get('RETURN', None)
+
+ elif doc_info:
+ if bool(doc_info['DOCUMENTATION']['value']):
+ doc, errors, traces = parse_yaml(
+ doc_info['DOCUMENTATION']['value'],
+ doc_info['DOCUMENTATION']['lineno'],
+ self.name, 'DOCUMENTATION'
+ )
+
+ for error in errors:
+ self.reporter.error(
+ path=self.object_path,
+ code='documentation-syntax-error',
+ **error
+ )
+ for trace in traces:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=trace
+ )
+
+ examples_raw = doc_info['EXAMPLES']['value']
+ examples_lineno = doc_info['EXAMPLES']['lineno']
+
+ returns = None
+ if bool(doc_info['RETURN']['value']):
+ returns, errors, traces = parse_yaml(doc_info['RETURN']['value'],
+ doc_info['RETURN']['lineno'],
+ self.name, 'RETURN')
+
+ for error in errors:
+ self.reporter.error(
+ path=self.object_path,
+ code='return-syntax-error',
+ **error
+ )
+ for trace in traces:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=trace
+ )
+
+ if doc:
+ add_collection_to_versions_and_dates(doc, self.collection_name,
+ is_module=self.plugin_type == 'module')
+
+ missing_fragment = False
+ with CaptureStd():
+ try:
+ get_docstring(self.path, fragment_loader=fragment_loader,
+ verbose=True,
+ collection_name=self.collection_name,
+ plugin_type=self.plugin_type)
+ except AssertionError:
+ fragment = doc['extends_documentation_fragment']
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-doc-fragment',
+ msg='DOCUMENTATION fragment missing: %s' % fragment
+ )
+ missing_fragment = True
+ except Exception as e:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=traceback.format_exc()
+ )
+ self.reporter.error(
+ path=self.object_path,
+ code='documentation-error',
+ msg='Unknown DOCUMENTATION error, see TRACE: %s' % e
+ )
+
+ if not missing_fragment:
+ add_fragments(doc, self.object_path, fragment_loader=fragment_loader,
+ is_module=self.plugin_type == 'module')
+
+ if 'options' in doc and doc['options'] is None:
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-documentation-options',
+ msg='DOCUMENTATION.options must be a dictionary/hash when used',
+ )
+
+ if 'deprecated' in doc and doc.get('deprecated'):
+ doc_deprecated = True
+ doc_deprecation = doc['deprecated']
+ documentation_collection = doc_deprecation.get('removed_from_collection')
+ if documentation_collection != self.collection_name:
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecation-wrong-collection',
+ msg='"DOCUMENTATION.deprecation.removed_from_collection must be the current collection name: %r vs. %r' % (
+ documentation_collection, self.collection_name)
+ )
+ else:
+ doc_deprecated = False
+
+ if os.path.islink(self.object_path):
+ # This module has an alias, which we can tell as it's a symlink
+ # Rather than checking for `module: $filename` we need to check against the true filename
+ self._validate_docs_schema(
+ doc,
+ doc_schema(
+ os.readlink(self.object_path).split('.')[0],
+ for_collection=bool(self.collection),
+ deprecated_module=deprecated,
+ plugin_type=self.plugin_type,
+ ),
+ 'DOCUMENTATION',
+ 'invalid-documentation',
+ )
+ else:
+ # This is the normal case
+ self._validate_docs_schema(
+ doc,
+ doc_schema(
+ self.object_name.split('.')[0],
+ for_collection=bool(self.collection),
+ deprecated_module=deprecated,
+ plugin_type=self.plugin_type,
+ ),
+ 'DOCUMENTATION',
+ 'invalid-documentation',
+ )
+
+ if not self.collection:
+ existing_doc = self._check_for_new_args(doc)
+ self._check_version_added(doc, existing_doc)
+ else:
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-documentation',
+ msg='No DOCUMENTATION provided',
+ )
+
+ if not examples_raw and self.plugin_type in PLUGINS_WITH_EXAMPLES:
+ if self.plugin_type in PLUGINS_WITH_EXAMPLES:
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-examples',
+ msg='No EXAMPLES provided'
+ )
+
+ elif self.plugin_type in PLUGINS_WITH_YAML_EXAMPLES:
+ dummy, errors, traces = parse_yaml(examples_raw,
+ examples_lineno,
+ self.name, 'EXAMPLES',
+ load_all=True,
+ ansible_loader=True)
+ for error in errors:
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-examples',
+ **error
+ )
+ for trace in traces:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=trace
+ )
+
+ if returns:
+ if returns:
+ add_collection_to_versions_and_dates(
+ returns,
+ self.collection_name,
+ is_module=self.plugin_type == 'module',
+ return_docs=True)
+ self._validate_docs_schema(
+ returns,
+ return_schema(for_collection=bool(self.collection), plugin_type=self.plugin_type),
+ 'RETURN', 'return-syntax-error')
+
+ elif self.plugin_type in PLUGINS_WITH_RETURN_VALUES:
+ if self._is_new_module():
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-return',
+ msg='No RETURN provided'
+ )
+ else:
+ self.reporter.warning(
+ path=self.object_path,
+ code='missing-return-legacy',
+ msg='No RETURN provided'
+ )
+
+ # Check for mismatched deprecation
+ if not self.collection:
+ mismatched_deprecation = True
+ if not (filename_deprecated_or_removed or deprecated or doc_deprecated):
+ mismatched_deprecation = False
+ else:
+ if (filename_deprecated_or_removed and doc_deprecated):
+ mismatched_deprecation = False
+ if (filename_deprecated_or_removed and not doc):
+ mismatched_deprecation = False
+
+ if mismatched_deprecation:
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecation-mismatch',
+ msg='Module deprecation/removed must agree in documentation, by prepending filename with'
+ ' "_", and setting DOCUMENTATION.deprecated for deprecation or by removing all'
+ ' documentation for removed'
+ )
+ else:
+ # We are testing a collection
+ if self.object_name.startswith('_'):
+ self.reporter.error(
+ path=self.object_path,
+ code='collections-no-underscore-on-deprecation',
+ msg='Deprecated content in collections MUST NOT start with "_", update meta/runtime.yml instead',
+ )
+
+ if not (doc_deprecated == routing_says_deprecated):
+ # DOCUMENTATION.deprecated and meta/runtime.yml disagree
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecation-mismatch',
+ msg='"meta/runtime.yml" and DOCUMENTATION.deprecation do not agree.'
+ )
+ elif routing_says_deprecated:
+ # Both DOCUMENTATION.deprecated and meta/runtime.yml agree that the module is deprecated.
+ # Make sure they give the same version or date.
+ routing_date = routing_deprecation.get('removal_date')
+ routing_version = routing_deprecation.get('removal_version')
+ # The versions and dates in the module documentation are auto-tagged, so remove the tag
+ # to make comparison possible and to avoid confusing the user.
+ documentation_date = doc_deprecation.get('removed_at_date')
+ documentation_version = doc_deprecation.get('removed_in')
+ if not compare_dates(routing_date, documentation_date):
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecation-mismatch',
+ msg='"meta/runtime.yml" and DOCUMENTATION.deprecation do not agree on removal date: %r vs. %r' % (
+ routing_date, documentation_date)
+ )
+ if routing_version != documentation_version:
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecation-mismatch',
+ msg='"meta/runtime.yml" and DOCUMENTATION.deprecation do not agree on removal version: %r vs. %r' % (
+ routing_version, documentation_version)
+ )
+
+ # In the future we should error if ANSIBLE_METADATA exists in a collection
+
+ return doc_info, doc
+
+ def _check_version_added(self, doc, existing_doc):
+ version_added_raw = doc.get('version_added')
+ try:
+ collection_name = doc.get('version_added_collection')
+ version_added = self._create_strict_version(
+ str(version_added_raw or '0.0'),
+ collection_name=collection_name)
+ except ValueError as e:
+ version_added = version_added_raw or '0.0'
+ if self._is_new_module() or version_added != 'historical':
+ # already reported during schema validation, except:
+ if version_added == 'historical':
+ self.reporter.error(
+ path=self.object_path,
+ code='module-invalid-version-added',
+ msg='version_added is not a valid version number: %r. Error: %s' % (version_added, e)
+ )
+ return
+
+ if existing_doc and str(version_added_raw) != str(existing_doc.get('version_added')):
+ self.reporter.error(
+ path=self.object_path,
+ code='module-incorrect-version-added',
+ msg='version_added should be %r. Currently %r' % (existing_doc.get('version_added'), version_added_raw)
+ )
+
+ if not self._is_new_module():
+ return
+
+ should_be = '.'.join(ansible_version.split('.')[:2])
+ strict_ansible_version = self._create_strict_version(should_be, collection_name='ansible.builtin')
+
+ if (version_added < strict_ansible_version or
+ strict_ansible_version < version_added):
+ self.reporter.error(
+ path=self.object_path,
+ code='module-incorrect-version-added',
+ msg='version_added should be %r. Currently %r' % (should_be, version_added_raw)
+ )
+
+ def _validate_ansible_module_call(self, docs):
+ try:
+ spec, kwargs = get_argument_spec(self.path, self.collection)
+ except AnsibleModuleNotInitialized:
+ self.reporter.error(
+ path=self.object_path,
+ code='ansible-module-not-initialized',
+ msg="Execution of the module did not result in initialization of AnsibleModule",
+ )
+ return
+ except AnsibleModuleImportError as e:
+ self.reporter.error(
+ path=self.object_path,
+ code='import-error',
+ msg="Exception attempting to import module for argument_spec introspection, '%s'" % e
+ )
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=traceback.format_exc()
+ )
+ return
+
+ schema = ansible_module_kwargs_schema(self.object_name.split('.')[0], for_collection=bool(self.collection))
+ self._validate_docs_schema(kwargs, schema, 'AnsibleModule', 'invalid-ansiblemodule-schema')
+
+ self._validate_argument_spec(docs, spec, kwargs)
+
+ def _validate_list_of_module_args(self, name, terms, spec, context):
+ if terms is None:
+ return
+ if not isinstance(terms, (list, tuple)):
+ # This is already reported by schema checking
+ return
+ for check in terms:
+ if not isinstance(check, (list, tuple)):
+ # This is already reported by schema checking
+ continue
+ bad_term = False
+ for term in check:
+ if not isinstance(term, string_types):
+ msg = name
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must contain strings in the lists or tuples; found value %r" % (term, )
+ self.reporter.error(
+ path=self.object_path,
+ code=name + '-type',
+ msg=msg,
+ )
+ bad_term = True
+ if bad_term:
+ continue
+ if len(set(check)) != len(check):
+ msg = name
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has repeated terms"
+ self.reporter.error(
+ path=self.object_path,
+ code=name + '-collision',
+ msg=msg,
+ )
+ if not set(check) <= set(spec):
+ msg = name
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " contains terms which are not part of argument_spec: %s" % ", ".join(sorted(set(check).difference(set(spec))))
+ self.reporter.error(
+ path=self.object_path,
+ code=name + '-unknown',
+ msg=msg,
+ )
+
+ def _validate_required_if(self, terms, spec, context, module):
+ if terms is None:
+ return
+ if not isinstance(terms, (list, tuple)):
+ # This is already reported by schema checking
+ return
+ for check in terms:
+ if not isinstance(check, (list, tuple)) or len(check) not in [3, 4]:
+ # This is already reported by schema checking
+ continue
+ if len(check) == 4 and not isinstance(check[3], bool):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must have forth value omitted or of type bool; got %r" % (check[3], )
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-is_one_of-type',
+ msg=msg,
+ )
+ requirements = check[2]
+ if not isinstance(requirements, (list, tuple)):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must have third value (requirements) being a list or tuple; got type %r" % (requirements, )
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-requirements-type',
+ msg=msg,
+ )
+ continue
+ bad_term = False
+ for term in requirements:
+ if not isinstance(term, string_types):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must have only strings in third value (requirements); got %r" % (term, )
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-requirements-type',
+ msg=msg,
+ )
+ bad_term = True
+ if bad_term:
+ continue
+ if len(set(requirements)) != len(requirements):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has repeated terms in requirements"
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-requirements-collision',
+ msg=msg,
+ )
+ if not set(requirements) <= set(spec):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " contains terms in requirements which are not part of argument_spec: %s" % ", ".join(sorted(set(requirements).difference(set(spec))))
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-requirements-unknown',
+ msg=msg,
+ )
+ key = check[0]
+ if key not in spec:
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must have its key %s in argument_spec" % key
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-unknown-key',
+ msg=msg,
+ )
+ continue
+ if key in requirements:
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " contains its key %s in requirements" % key
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-key-in-requirements',
+ msg=msg,
+ )
+ value = check[1]
+ if value is not None:
+ _type = spec[key].get('type', 'str')
+ if callable(_type):
+ _type_checker = _type
+ else:
+ _type_checker = DEFAULT_TYPE_VALIDATORS.get(_type)
+ try:
+ with CaptureStd():
+ dummy = _type_checker(value)
+ except (Exception, SystemExit):
+ msg = "required_if"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has value %r which does not fit to %s's parameter type %r" % (value, key, _type)
+ self.reporter.error(
+ path=self.object_path,
+ code='required_if-value-type',
+ msg=msg,
+ )
+
+ def _validate_required_by(self, terms, spec, context):
+ if terms is None:
+ return
+ if not isinstance(terms, Mapping):
+ # This is already reported by schema checking
+ return
+ for key, value in terms.items():
+ if isinstance(value, string_types):
+ value = [value]
+ if not isinstance(value, (list, tuple)):
+ # This is already reported by schema checking
+ continue
+ for term in value:
+ if not isinstance(term, string_types):
+ # This is already reported by schema checking
+ continue
+ if len(set(value)) != len(value) or key in value:
+ msg = "required_by"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has repeated terms"
+ self.reporter.error(
+ path=self.object_path,
+ code='required_by-collision',
+ msg=msg,
+ )
+ if not set(value) <= set(spec) or key not in spec:
+ msg = "required_by"
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " contains terms which are not part of argument_spec: %s" % ", ".join(sorted(set(value).difference(set(spec))))
+ self.reporter.error(
+ path=self.object_path,
+ code='required_by-unknown',
+ msg=msg,
+ )
+
+ def _validate_argument_spec(self, docs, spec, kwargs, context=None, last_context_spec=None):
+ if not self.analyze_arg_spec:
+ return
+
+ if docs is None:
+ docs = {}
+
+ if context is None:
+ context = []
+
+ if last_context_spec is None:
+ last_context_spec = kwargs
+
+ try:
+ if not context:
+ add_fragments(docs, self.object_path, fragment_loader=fragment_loader,
+ is_module=self.plugin_type == 'module')
+ except Exception:
+ # Cannot merge fragments
+ return
+
+ # Use this to access type checkers later
+ module = NoArgsAnsibleModule({})
+
+ self._validate_list_of_module_args('mutually_exclusive', last_context_spec.get('mutually_exclusive'), spec, context)
+ self._validate_list_of_module_args('required_together', last_context_spec.get('required_together'), spec, context)
+ self._validate_list_of_module_args('required_one_of', last_context_spec.get('required_one_of'), spec, context)
+ self._validate_required_if(last_context_spec.get('required_if'), spec, context, module)
+ self._validate_required_by(last_context_spec.get('required_by'), spec, context)
+
+ provider_args = set()
+ args_from_argspec = set()
+ deprecated_args_from_argspec = set()
+ doc_options = docs.get('options', {})
+ if doc_options is None:
+ doc_options = {}
+ for arg, data in spec.items():
+ restricted_argument_names = ('message', 'syslog_facility')
+ if arg.lower() in restricted_argument_names:
+ msg = "Argument '%s' in argument_spec " % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += "must not be one of %s as it is used " \
+ "internally by Ansible Core Engine" % (",".join(restricted_argument_names))
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-argument-name',
+ msg=msg,
+ )
+ continue
+ if 'aliases' in data:
+ for al in data['aliases']:
+ if al.lower() in restricted_argument_names:
+ msg = "Argument alias '%s' in argument_spec " % al
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += "must not be one of %s as it is used " \
+ "internally by Ansible Core Engine" % (",".join(restricted_argument_names))
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-argument-name',
+ msg=msg,
+ )
+ continue
+
+ # Could this a place where secrets are leaked?
+ # If it is type: path we know it's not a secret key as it's a file path.
+ # If it is type: bool it is more likely a flag indicating that something is secret, than an actual secret.
+ if all((
+ data.get('no_log') is None, is_potential_secret_option(arg),
+ data.get('type') not in ("path", "bool"), data.get('choices') is None,
+ )):
+ msg = "Argument '%s' in argument_spec could be a secret, though doesn't have `no_log` set" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ self.reporter.error(
+ path=self.object_path,
+ code='no-log-needed',
+ msg=msg,
+ )
+
+ if not isinstance(data, dict):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " must be a dictionary/hash when used"
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-argument-spec',
+ msg=msg,
+ )
+ continue
+
+ removed_at_date = data.get('removed_at_date', None)
+ if removed_at_date is not None:
+ try:
+ if parse_isodate(removed_at_date, allow_date=False) < datetime.date.today():
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has a removed_at_date '%s' before today" % removed_at_date
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecated-date',
+ msg=msg,
+ )
+ except ValueError:
+ # This should only happen when removed_at_date is not in ISO format. Since schema
+ # validation already reported this as an error, don't report it a second time.
+ pass
+
+ deprecated_aliases = data.get('deprecated_aliases', None)
+ if deprecated_aliases is not None:
+ for deprecated_alias in deprecated_aliases:
+ if 'name' in deprecated_alias and 'date' in deprecated_alias:
+ try:
+ date = deprecated_alias['date']
+ if parse_isodate(date, allow_date=False) < datetime.date.today():
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has deprecated aliases '%s' with removal date '%s' before today" % (
+ deprecated_alias['name'], deprecated_alias['date'])
+ self.reporter.error(
+ path=self.object_path,
+ code='deprecated-date',
+ msg=msg,
+ )
+ except ValueError:
+ # This should only happen when deprecated_alias['date'] is not in ISO format. Since
+ # schema validation already reported this as an error, don't report it a second
+ # time.
+ pass
+
+ has_version = False
+ if self.collection and self.collection_version is not None:
+ compare_version = self.collection_version
+ version_of_what = "this collection (%s)" % self.collection_version_str
+ code_prefix = 'collection'
+ has_version = True
+ elif not self.collection:
+ compare_version = LOOSE_ANSIBLE_VERSION
+ version_of_what = "Ansible (%s)" % ansible_version
+ code_prefix = 'ansible'
+ has_version = True
+
+ removed_in_version = data.get('removed_in_version', None)
+ if removed_in_version is not None:
+ try:
+ collection_name = data.get('removed_from_collection')
+ removed_in = self._create_version(str(removed_in_version), collection_name=collection_name)
+ if has_version and collection_name == self.collection_name and compare_version >= removed_in:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has a deprecated removed_in_version %r," % removed_in_version
+ msg += " i.e. the version is less than or equal to the current version of %s" % version_of_what
+ self.reporter.error(
+ path=self.object_path,
+ code=code_prefix + '-deprecated-version',
+ msg=msg,
+ )
+ except ValueError as e:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has an invalid removed_in_version number %r: %s" % (removed_in_version, e)
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-deprecated-version',
+ msg=msg,
+ )
+ except TypeError:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has an invalid removed_in_version number %r: " % (removed_in_version, )
+ msg += " error while comparing to version of %s" % version_of_what
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-deprecated-version',
+ msg=msg,
+ )
+
+ if deprecated_aliases is not None:
+ for deprecated_alias in deprecated_aliases:
+ if 'name' in deprecated_alias and 'version' in deprecated_alias:
+ try:
+ collection_name = deprecated_alias.get('collection_name')
+ version = self._create_version(str(deprecated_alias['version']), collection_name=collection_name)
+ if has_version and collection_name == self.collection_name and compare_version >= version:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has deprecated aliases '%s' with removal in version %r," % (
+ deprecated_alias['name'], deprecated_alias['version'])
+ msg += " i.e. the version is less than or equal to the current version of %s" % version_of_what
+ self.reporter.error(
+ path=self.object_path,
+ code=code_prefix + '-deprecated-version',
+ msg=msg,
+ )
+ except ValueError as e:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has deprecated aliases '%s' with invalid removal version %r: %s" % (
+ deprecated_alias['name'], deprecated_alias['version'], e)
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-deprecated-version',
+ msg=msg,
+ )
+ except TypeError:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has deprecated aliases '%s' with invalid removal version %r:" % (
+ deprecated_alias['name'], deprecated_alias['version'])
+ msg += " error while comparing to version of %s" % version_of_what
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-deprecated-version',
+ msg=msg,
+ )
+
+ aliases = data.get('aliases', [])
+ if arg in aliases:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " is specified as its own alias"
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-alias-self',
+ msg=msg
+ )
+ if len(aliases) > len(set(aliases)):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has at least one alias specified multiple times in aliases"
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-alias-repeated',
+ msg=msg
+ )
+ if not context and arg == 'state':
+ bad_states = set(['list', 'info', 'get']) & set(data.get('choices', set()))
+ for bad_state in bad_states:
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-state-invalid-choice',
+ msg="Argument 'state' includes the value '%s' as a choice" % bad_state)
+ if not data.get('removed_in_version', None) and not data.get('removed_at_date', None):
+ args_from_argspec.add(arg)
+ args_from_argspec.update(aliases)
+ else:
+ deprecated_args_from_argspec.add(arg)
+ deprecated_args_from_argspec.update(aliases)
+ if arg == 'provider' and self.object_path.startswith('lib/ansible/modules/network/'):
+ if data.get('options') is not None and not isinstance(data.get('options'), Mapping):
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-argument-spec-options',
+ msg="Argument 'options' in argument_spec['provider'] must be a dictionary/hash when used",
+ )
+ elif data.get('options'):
+ # Record provider options from network modules, for later comparison
+ for provider_arg, provider_data in data.get('options', {}).items():
+ provider_args.add(provider_arg)
+ provider_args.update(provider_data.get('aliases', []))
+
+ if data.get('required') and data.get('default', object) != object:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " is marked as required but specifies a default. Arguments with a" \
+ " default should not be marked as required"
+ self.reporter.error(
+ path=self.object_path,
+ code='no-default-for-required-parameter',
+ msg=msg
+ )
+
+ if arg in provider_args:
+ # Provider args are being removed from network module top level
+ # don't validate docs<->arg_spec checks below
+ continue
+
+ _type = data.get('type', 'str')
+ if callable(_type):
+ _type_checker = _type
+ else:
+ _type_checker = DEFAULT_TYPE_VALIDATORS.get(_type)
+
+ _elements = data.get('elements')
+ if (_type == 'list') and not _elements:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines type as list but elements is not defined"
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-list-no-elements',
+ msg=msg
+ )
+ if _elements:
+ if not callable(_elements):
+ DEFAULT_TYPE_VALIDATORS.get(_elements)
+ if _type != 'list':
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines elements as %s but it is valid only when value of parameter type is list" % _elements
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-invalid-elements',
+ msg=msg
+ )
+
+ arg_default = None
+ if 'default' in data and not is_empty(data['default']):
+ try:
+ with CaptureStd():
+ arg_default = _type_checker(data['default'])
+ except (Exception, SystemExit):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines default as (%r) but this is incompatible with parameter type %r" % (data['default'], _type)
+ self.reporter.error(
+ path=self.object_path,
+ code='incompatible-default-type',
+ msg=msg
+ )
+ continue
+
+ doc_options_args = []
+ for alias in sorted(set([arg] + list(aliases))):
+ if alias in doc_options:
+ doc_options_args.append(alias)
+ if len(doc_options_args) == 0:
+ # Undocumented arguments will be handled later (search for undocumented-parameter)
+ doc_options_arg = {}
+ else:
+ doc_options_arg = doc_options[doc_options_args[0]]
+ if len(doc_options_args) > 1:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " with aliases %s is documented multiple times, namely as %s" % (
+ ", ".join([("'%s'" % alias) for alias in aliases]),
+ ", ".join([("'%s'" % alias) for alias in doc_options_args])
+ )
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-documented-multiple-times',
+ msg=msg
+ )
+
+ try:
+ doc_default = None
+ if 'default' in doc_options_arg and not is_empty(doc_options_arg['default']):
+ with CaptureStd():
+ doc_default = _type_checker(doc_options_arg['default'])
+ except (Exception, SystemExit):
+ msg = "Argument '%s' in documentation" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines default as (%r) but this is incompatible with parameter type %r" % (doc_options_arg.get('default'), _type)
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-default-incompatible-type',
+ msg=msg
+ )
+ continue
+
+ if arg_default != doc_default:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines default as (%r) but documentation defines default as (%r)" % (arg_default, doc_default)
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-default-does-not-match-spec',
+ msg=msg
+ )
+
+ doc_type = doc_options_arg.get('type')
+ if 'type' in data and data['type'] is not None:
+ if doc_type is None:
+ if not arg.startswith('_'): # hidden parameter, for example _raw_params
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines type as %r but documentation doesn't define type" % (data['type'])
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-type-not-in-doc',
+ msg=msg
+ )
+ elif data['type'] != doc_type:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines type as %r but documentation defines type as %r" % (data['type'], doc_type)
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-type-does-not-match-spec',
+ msg=msg
+ )
+ else:
+ if doc_type is None:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " uses default type ('str') but documentation doesn't define type"
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-missing-type',
+ msg=msg
+ )
+ elif doc_type != 'str':
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " implies type as 'str' but documentation defines as %r" % doc_type
+ self.reporter.error(
+ path=self.object_path,
+ code='implied-parameter-type-mismatch',
+ msg=msg
+ )
+
+ doc_choices = []
+ try:
+ for choice in doc_options_arg.get('choices', []):
+ try:
+ with CaptureStd():
+ doc_choices.append(_type_checker(choice))
+ except (Exception, SystemExit):
+ msg = "Argument '%s' in documentation" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines choices as (%r) but this is incompatible with argument type %r" % (choice, _type)
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-choices-incompatible-type',
+ msg=msg
+ )
+ raise StopIteration()
+ except StopIteration:
+ continue
+
+ arg_choices = []
+ try:
+ for choice in data.get('choices', []):
+ try:
+ with CaptureStd():
+ arg_choices.append(_type_checker(choice))
+ except (Exception, SystemExit):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines choices as (%r) but this is incompatible with argument type %r" % (choice, _type)
+ self.reporter.error(
+ path=self.object_path,
+ code='incompatible-choices',
+ msg=msg
+ )
+ raise StopIteration()
+ except StopIteration:
+ continue
+
+ if not compare_unordered_lists(arg_choices, doc_choices):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " defines choices as (%r) but documentation defines choices as (%r)" % (arg_choices, doc_choices)
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-choices-do-not-match-spec',
+ msg=msg
+ )
+
+ doc_required = doc_options_arg.get('required', False)
+ data_required = data.get('required', False)
+ if (doc_required or data_required) and not (doc_required and data_required):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ if doc_required:
+ msg += " is not required, but is documented as being required"
+ else:
+ msg += " is required, but is not documented as being required"
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-required-mismatch',
+ msg=msg
+ )
+
+ doc_elements = doc_options_arg.get('elements', None)
+ doc_type = doc_options_arg.get('type', 'str')
+ data_elements = data.get('elements', None)
+ if (doc_elements or data_elements) and not (doc_elements == data_elements):
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ if data_elements:
+ msg += " specifies elements as %s," % data_elements
+ else:
+ msg += " does not specify elements,"
+ if doc_elements:
+ msg += "but elements is documented as being %s" % doc_elements
+ else:
+ msg += "but elements is not documented"
+ self.reporter.error(
+ path=self.object_path,
+ code='doc-elements-mismatch',
+ msg=msg
+ )
+
+ spec_suboptions = data.get('options')
+ doc_suboptions = doc_options_arg.get('suboptions', {})
+ if spec_suboptions:
+ if not doc_suboptions:
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " has sub-options but documentation does not define it"
+ self.reporter.error(
+ path=self.object_path,
+ code='missing-suboption-docs',
+ msg=msg
+ )
+ self._validate_argument_spec({'options': doc_suboptions}, spec_suboptions, kwargs,
+ context=context + [arg], last_context_spec=data)
+
+ for arg in args_from_argspec:
+ if not str(arg).isidentifier():
+ msg = "Argument '%s' in argument_spec" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " is not a valid python identifier"
+ self.reporter.error(
+ path=self.object_path,
+ code='parameter-invalid',
+ msg=msg
+ )
+
+ if docs:
+ args_from_docs = set()
+ for arg, data in doc_options.items():
+ args_from_docs.add(arg)
+ args_from_docs.update(data.get('aliases', []))
+
+ args_missing_from_docs = args_from_argspec.difference(args_from_docs)
+ docs_missing_from_args = args_from_docs.difference(args_from_argspec | deprecated_args_from_argspec)
+ for arg in args_missing_from_docs:
+ if arg in provider_args:
+ # Provider args are being removed from network module top level
+ # So they are likely not documented on purpose
+ continue
+ msg = "Argument '%s'" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " is listed in the argument_spec, but not documented in the module documentation"
+ self.reporter.error(
+ path=self.object_path,
+ code='undocumented-parameter',
+ msg=msg
+ )
+ for arg in docs_missing_from_args:
+ msg = "Argument '%s'" % arg
+ if context:
+ msg += " found in %s" % " -> ".join(context)
+ msg += " is listed in DOCUMENTATION.options, but not accepted by the module argument_spec"
+ self.reporter.error(
+ path=self.object_path,
+ code='nonexistent-parameter-documented',
+ msg=msg
+ )
+
+ def _check_for_new_args(self, doc):
+ if not self.base_branch or self._is_new_module():
+ return
+
+ with CaptureStd():
+ try:
+ existing_doc, dummy_examples, dummy_return, existing_metadata = get_docstring(
+ self.base_module, fragment_loader, verbose=True, collection_name=self.collection_name,
+ is_module=self.plugin_type == 'module')
+ existing_options = existing_doc.get('options', {}) or {}
+ except AssertionError:
+ fragment = doc['extends_documentation_fragment']
+ self.reporter.warning(
+ path=self.object_path,
+ code='missing-existing-doc-fragment',
+ msg='Pre-existing DOCUMENTATION fragment missing: %s' % fragment
+ )
+ return
+ except Exception as e:
+ self.reporter.warning_trace(
+ path=self.object_path,
+ tracebk=e
+ )
+ self.reporter.warning(
+ path=self.object_path,
+ code='unknown-doc-fragment',
+ msg=('Unknown pre-existing DOCUMENTATION error, see TRACE. Submodule refs may need updated')
+ )
+ return
+
+ try:
+ mod_collection_name = existing_doc.get('version_added_collection')
+ mod_version_added = self._create_strict_version(
+ str(existing_doc.get('version_added', '0.0')),
+ collection_name=mod_collection_name)
+ except ValueError:
+ mod_collection_name = self.collection_name
+ mod_version_added = self._create_strict_version('0.0')
+
+ options = doc.get('options', {}) or {}
+
+ should_be = '.'.join(ansible_version.split('.')[:2])
+ strict_ansible_version = self._create_strict_version(should_be, collection_name='ansible.builtin')
+
+ for option, details in options.items():
+ try:
+ names = [option] + details.get('aliases', [])
+ except (TypeError, AttributeError):
+ # Reporting of this syntax error will be handled by schema validation.
+ continue
+
+ if any(name in existing_options for name in names):
+ # The option already existed. Make sure version_added didn't change.
+ for name in names:
+ existing_collection_name = existing_options.get(name, {}).get('version_added_collection')
+ existing_version = existing_options.get(name, {}).get('version_added')
+ if existing_version:
+ break
+ current_collection_name = details.get('version_added_collection')
+ current_version = details.get('version_added')
+ if current_collection_name != existing_collection_name:
+ self.reporter.error(
+ path=self.object_path,
+ code='option-incorrect-version-added-collection',
+ msg=('version_added for existing option (%s) should '
+ 'belong to collection %r. Currently belongs to %r' %
+ (option, current_collection_name, existing_collection_name))
+ )
+ elif str(current_version) != str(existing_version):
+ self.reporter.error(
+ path=self.object_path,
+ code='option-incorrect-version-added',
+ msg=('version_added for existing option (%s) should '
+ 'be %r. Currently %r' %
+ (option, existing_version, current_version))
+ )
+ continue
+
+ try:
+ collection_name = details.get('version_added_collection')
+ version_added = self._create_strict_version(
+ str(details.get('version_added', '0.0')),
+ collection_name=collection_name)
+ except ValueError as e:
+ # already reported during schema validation
+ continue
+
+ builtin = self.collection_name == 'ansible.builtin' and collection_name in ('ansible.builtin', None)
+ if not builtin and collection_name != self.collection_name:
+ continue
+ if (strict_ansible_version != mod_version_added and
+ (version_added < strict_ansible_version or
+ strict_ansible_version < version_added)):
+ self.reporter.error(
+ path=self.object_path,
+ code='option-incorrect-version-added',
+ msg=('version_added for new option (%s) should '
+ 'be %r. Currently %r' %
+ (option, should_be, version_added))
+ )
+
+ return existing_doc
+
+ @staticmethod
+ def is_on_rejectlist(path):
+ base_name = os.path.basename(path)
+ file_name = os.path.splitext(base_name)[0]
+
+ if file_name.startswith('_') and os.path.islink(path):
+ return True
+
+ if not frozenset((base_name, file_name)).isdisjoint(ModuleValidator.REJECTLIST):
+ return True
+
+ for pat in ModuleValidator.REJECTLIST_PATTERNS:
+ if fnmatch(base_name, pat):
+ return True
+
+ return False
+
+ def validate(self):
+ super(ModuleValidator, self).validate()
+ if not self._python_module() and not self._powershell_module() and not self._sidecar_doc():
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-extension',
+ msg=('Official Ansible modules must have a .py '
+ 'extension for python modules or a .ps1 '
+ 'for powershell modules')
+ )
+ self._python_module_override = True
+
+ if self._python_module() and self.ast is None:
+ self.reporter.error(
+ path=self.object_path,
+ code='python-syntax-error',
+ msg='Python SyntaxError while parsing module'
+ )
+ try:
+ compile(self.text, self.path, 'exec')
+ except Exception:
+ self.reporter.trace(
+ path=self.object_path,
+ tracebk=traceback.format_exc()
+ )
+ return
+
+ end_of_deprecation_should_be_removed_only = False
+ doc_info = None
+ if self._python_module() or self._sidecar_doc():
+ doc_info, docs = self._validate_docs()
+
+ # See if current version => deprecated.removed_in, ie, should be docs only
+ if docs and docs.get('deprecated', False):
+
+ if 'removed_in' in docs['deprecated']:
+ removed_in = None
+ collection_name = docs['deprecated'].get('removed_from_collection')
+ version = docs['deprecated']['removed_in']
+ if collection_name != self.collection_name:
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-module-deprecation-source',
+ msg=('The deprecation version for a module must be added in this collection')
+ )
+ else:
+ try:
+ removed_in = self._create_strict_version(str(version), collection_name=collection_name)
+ except ValueError as e:
+ self.reporter.error(
+ path=self.object_path,
+ code='invalid-module-deprecation-version',
+ msg=('The deprecation version %r cannot be parsed: %s' % (version, e))
+ )
+
+ if removed_in:
+ if not self.collection:
+ strict_ansible_version = self._create_strict_version(
+ '.'.join(ansible_version.split('.')[:2]), self.collection_name)
+ end_of_deprecation_should_be_removed_only = strict_ansible_version >= removed_in
+
+ if end_of_deprecation_should_be_removed_only:
+ self.reporter.error(
+ path=self.object_path,
+ code='ansible-deprecated-module',
+ msg='Module is marked for removal in version %s of Ansible when the current version is %s' % (
+ version, ansible_version),
+ )
+ elif self.collection_version:
+ strict_ansible_version = self.collection_version
+ end_of_deprecation_should_be_removed_only = strict_ansible_version >= removed_in
+
+ if end_of_deprecation_should_be_removed_only:
+ self.reporter.error(
+ path=self.object_path,
+ code='collection-deprecated-module',
+ msg='Module is marked for removal in version %s of this collection when the current version is %s' % (
+ version, self.collection_version_str),
+ )
+
+ # handle deprecation by date
+ if 'removed_at_date' in docs['deprecated']:
+ try:
+ removed_at_date = docs['deprecated']['removed_at_date']
+ if parse_isodate(removed_at_date, allow_date=True) < datetime.date.today():
+ msg = "Module's deprecated.removed_at_date date '%s' is before today" % removed_at_date
+ self.reporter.error(path=self.object_path, code='deprecated-date', msg=msg)
+ except ValueError:
+ # This happens if the date cannot be parsed. This is already checked by the schema.
+ pass
+
+ if self._python_module() and not self._just_docs() and not end_of_deprecation_should_be_removed_only:
+ if self.plugin_type == 'module':
+ self._validate_ansible_module_call(docs)
+ self._check_for_sys_exit()
+ self._find_rejectlist_imports()
+ if self.plugin_type == 'module':
+ self._find_module_utils()
+ self._find_has_import()
+
+ if doc_info:
+ first_callable = self._get_first_callable() or 1000000 # use a bogus "high" line number if no callable exists
+ self._ensure_imports_below_docs(doc_info, first_callable)
+
+ if self.plugin_type == 'module':
+ self._check_for_subprocess()
+ self._check_for_os_call()
+
+ if self._powershell_module():
+ self._validate_ps_replacers()
+ docs_path = self._find_ps_docs_file()
+
+ # We can only validate PowerShell arg spec if it is using the new Ansible.Basic.AnsibleModule util
+ pattern = r'(?im)^#\s*ansiblerequires\s+\-csharputil\s*Ansible\.Basic'
+ if re.search(pattern, self.text) and self.object_name not in self.PS_ARG_VALIDATE_REJECTLIST:
+ with ModuleValidator(docs_path, base_branch=self.base_branch, git_cache=self.git_cache) as docs_mv:
+ docs = docs_mv._validate_docs()[1]
+ self._validate_ansible_module_call(docs)
+
+ self._check_gpl3_header()
+ if not self._just_docs() and not self._sidecar_doc() and not end_of_deprecation_should_be_removed_only:
+ if self.plugin_type == 'module':
+ self._check_interpreter(powershell=self._powershell_module())
+ self._check_type_instead_of_isinstance(
+ powershell=self._powershell_module()
+ )
+
+
+class PythonPackageValidator(Validator):
+ REJECTLIST_FILES = frozenset(('__pycache__',))
+
+ def __init__(self, path, reporter=None):
+ super(PythonPackageValidator, self).__init__(reporter=reporter or Reporter())
+
+ self.path = path
+ self.basename = os.path.basename(path)
+
+ @property
+ def object_name(self):
+ return self.basename
+
+ @property
+ def object_path(self):
+ return self.path
+
+ def validate(self):
+ super(PythonPackageValidator, self).validate()
+
+ if self.basename in self.REJECTLIST_FILES:
+ return
+
+ init_file = os.path.join(self.path, '__init__.py')
+ if not os.path.exists(init_file):
+ self.reporter.error(
+ path=self.object_path,
+ code='subdirectory-missing-init',
+ msg='Ansible module subdirectories must contain an __init__.py'
+ )
+
+
+def re_compile(value):
+ """
+ Argparse expects things to raise TypeError, re.compile raises an re.error
+ exception
+
+ This function is a shorthand to convert the re.error exception to a
+ TypeError
+ """
+
+ try:
+ return re.compile(value)
+ except re.error as e:
+ raise TypeError(e)
+
+
+def run():
+ parser = argparse.ArgumentParser(prog="validate-modules")
+ parser.add_argument('plugins', nargs='+',
+ help='Path to module/plugin or module/plugin directory')
+ parser.add_argument('-w', '--warnings', help='Show warnings',
+ action='store_true')
+ parser.add_argument('--exclude', help='RegEx exclusion pattern',
+ type=re_compile)
+ parser.add_argument('--arg-spec', help='Analyze module argument spec',
+ action='store_true', default=False)
+ parser.add_argument('--base-branch', default=None,
+ help='Used in determining if new options were added')
+ parser.add_argument('--format', choices=['json', 'plain'], default='plain',
+ help='Output format. Default: "%(default)s"')
+ parser.add_argument('--output', default='-',
+ help='Output location, use "-" for stdout. '
+ 'Default "%(default)s"')
+ parser.add_argument('--collection',
+ help='Specifies the path to the collection, when '
+ 'validating files within a collection. Ensure '
+ 'that ANSIBLE_COLLECTIONS_PATH is set so the '
+ 'contents of the collection can be located')
+ parser.add_argument('--collection-version',
+ help='The collection\'s version number used to check '
+ 'deprecations')
+ parser.add_argument('--plugin-type',
+ default='module',
+ help='The plugin type to validate. Defaults to %(default)s')
+
+ args = parser.parse_args()
+
+ args.plugins = [m.rstrip('/') for m in args.plugins]
+
+ reporter = Reporter()
+ git_cache = GitCache(args.base_branch, args.plugin_type)
+
+ check_dirs = set()
+
+ routing = None
+ if args.collection:
+ routing_file = 'meta/runtime.yml'
+ # Load meta/runtime.yml if it exists, as it may contain deprecation information
+ if os.path.isfile(routing_file):
+ try:
+ with open(routing_file) as f:
+ routing = yaml.safe_load(f)
+ except yaml.error.MarkedYAMLError as ex:
+ print('%s:%d:%d: YAML load failed: %s' % (routing_file, ex.context_mark.line + 1, ex.context_mark.column + 1, re.sub(r'\s+', ' ', str(ex))))
+ except Exception as ex: # pylint: disable=broad-except
+ print('%s:%d:%d: YAML load failed: %s' % (routing_file, 0, 0, re.sub(r'\s+', ' ', str(ex))))
+
+ for plugin in args.plugins:
+ if os.path.isfile(plugin):
+ path = plugin
+ if args.exclude and args.exclude.search(path):
+ continue
+ if ModuleValidator.is_on_rejectlist(path):
+ continue
+ with ModuleValidator(path, collection=args.collection, collection_version=args.collection_version,
+ analyze_arg_spec=args.arg_spec, base_branch=args.base_branch,
+ git_cache=git_cache, reporter=reporter, routing=routing,
+ plugin_type=args.plugin_type) as mv1:
+ mv1.validate()
+ check_dirs.add(os.path.dirname(path))
+
+ for root, dirs, files in os.walk(plugin):
+ basedir = root[len(plugin) + 1:].split('/', 1)[0]
+ if basedir in REJECTLIST_DIRS:
+ continue
+ for dirname in dirs:
+ if root == plugin and dirname in REJECTLIST_DIRS:
+ continue
+ path = os.path.join(root, dirname)
+ if args.exclude and args.exclude.search(path):
+ continue
+ check_dirs.add(path)
+
+ for filename in files:
+ path = os.path.join(root, filename)
+ if args.exclude and args.exclude.search(path):
+ continue
+ if ModuleValidator.is_on_rejectlist(path):
+ continue
+ with ModuleValidator(path, collection=args.collection, collection_version=args.collection_version,
+ analyze_arg_spec=args.arg_spec, base_branch=args.base_branch,
+ git_cache=git_cache, reporter=reporter, routing=routing,
+ plugin_type=args.plugin_type) as mv2:
+ mv2.validate()
+
+ if not args.collection and args.plugin_type == 'module':
+ for path in sorted(check_dirs):
+ pv = PythonPackageValidator(path, reporter=reporter)
+ pv.validate()
+
+ if args.format == 'plain':
+ sys.exit(reporter.plain(warnings=args.warnings, output=args.output))
+ else:
+ sys.exit(reporter.json(warnings=args.warnings, output=args.output))
+
+
+class GitCache:
+ def __init__(self, base_branch, plugin_type):
+ self.base_branch = base_branch
+ self.plugin_type = plugin_type
+
+ self.rel_path = 'lib/ansible/modules/'
+ if plugin_type != 'module':
+ self.rel_path = 'lib/ansible/plugins/%s/' % plugin_type
+
+ if self.base_branch:
+ self.base_tree = self._git(['ls-tree', '-r', '--name-only', self.base_branch, self.rel_path])
+ else:
+ self.base_tree = []
+
+ try:
+ self.head_tree = self._git(['ls-tree', '-r', '--name-only', 'HEAD', self.rel_path])
+ except GitError as ex:
+ if ex.status == 128:
+ # fallback when there is no .git directory
+ self.head_tree = self._get_module_files()
+ else:
+ raise
+ except FileNotFoundError:
+ # fallback when git is not installed
+ self.head_tree = self._get_module_files()
+
+ allowed_exts = ('.py', '.ps1')
+ if plugin_type != 'module':
+ allowed_exts = ('.py', )
+ self.base_module_paths = dict((os.path.basename(p), p) for p in self.base_tree if os.path.splitext(p)[1] in allowed_exts)
+
+ self.base_module_paths.pop('__init__.py', None)
+
+ self.head_aliased_modules = set()
+
+ for path in self.head_tree:
+ filename = os.path.basename(path)
+
+ if filename.startswith('_') and filename != '__init__.py':
+ if os.path.islink(path):
+ self.head_aliased_modules.add(os.path.basename(os.path.realpath(path)))
+
+ def _get_module_files(self):
+ module_files = []
+
+ for (dir_path, dir_names, file_names) in os.walk(self.rel_path):
+ for file_name in file_names:
+ module_files.append(os.path.join(dir_path, file_name))
+
+ return module_files
+
+ @staticmethod
+ def _git(args):
+ cmd = ['git'] + args
+ p = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, check=False)
+
+ if p.returncode != 0:
+ raise GitError(p.stderr, p.returncode)
+
+ return p.stdout.splitlines()
+
+
+class GitError(Exception):
+ def __init__(self, message, status):
+ super(GitError, self).__init__(message)
+
+ self.status = status
+
+
+def main():
+ try:
+ run()
+ except KeyboardInterrupt:
+ pass
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/module_args.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/module_args.py
new file mode 100644
index 0000000..03a1401
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/module_args.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2016 Matt Martz <matt@sivel.net>
+# Copyright (C) 2016 Rackspace US, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import annotations
+
+import runpy
+import inspect
+import json
+import os
+import subprocess
+import sys
+
+from contextlib import contextmanager
+
+from ansible.executor.powershell.module_manifest import PSModuleDepFinder
+from ansible.module_utils.basic import FILE_COMMON_ARGUMENTS, AnsibleModule
+from ansible.module_utils.six import reraise
+from ansible.module_utils._text import to_bytes, to_text
+
+from .utils import CaptureStd, find_executable, get_module_name_from_filename
+
+
+ANSIBLE_MODULE_CONSTRUCTOR_ARGS = tuple(list(inspect.signature(AnsibleModule.__init__).parameters)[1:])
+
+
+class AnsibleModuleCallError(RuntimeError):
+ pass
+
+
+class AnsibleModuleImportError(ImportError):
+ pass
+
+
+class AnsibleModuleNotInitialized(Exception):
+ pass
+
+
+class _FakeAnsibleModuleInit:
+ def __init__(self):
+ self.args = tuple()
+ self.kwargs = {}
+ self.called = False
+
+ def __call__(self, *args, **kwargs):
+ if args and isinstance(args[0], AnsibleModule):
+ # Make sure, due to creative calling, that we didn't end up with
+ # ``self`` in ``args``
+ self.args = args[1:]
+ else:
+ self.args = args
+ self.kwargs = kwargs
+ self.called = True
+ raise AnsibleModuleCallError('AnsibleModuleCallError')
+
+
+def _fake_load_params():
+ pass
+
+
+@contextmanager
+def setup_env(filename):
+ # Used to clean up imports later
+ pre_sys_modules = list(sys.modules.keys())
+
+ fake = _FakeAnsibleModuleInit()
+ module = __import__('ansible.module_utils.basic').module_utils.basic
+ _original_init = module.AnsibleModule.__init__
+ _original_load_params = module._load_params
+ setattr(module.AnsibleModule, '__init__', fake)
+ setattr(module, '_load_params', _fake_load_params)
+
+ try:
+ yield fake
+ finally:
+ setattr(module.AnsibleModule, '__init__', _original_init)
+ setattr(module, '_load_params', _original_load_params)
+
+ # Clean up imports to prevent issues with mutable data being used in modules
+ for k in list(sys.modules.keys()):
+ # It's faster if we limit to items in ansible.module_utils
+ # But if this causes problems later, we should remove it
+ if k not in pre_sys_modules and k.startswith('ansible.module_utils.'):
+ del sys.modules[k]
+
+
+def get_ps_argument_spec(filename, collection):
+ fqc_name = get_module_name_from_filename(filename, collection)
+
+ pwsh = find_executable('pwsh')
+ if not pwsh:
+ raise FileNotFoundError('Required program for PowerShell arg spec inspection "pwsh" not found.')
+
+ module_path = os.path.join(os.getcwd(), filename)
+ b_module_path = to_bytes(module_path, errors='surrogate_or_strict')
+ with open(b_module_path, mode='rb') as module_fd:
+ b_module_data = module_fd.read()
+
+ ps_dep_finder = PSModuleDepFinder()
+ ps_dep_finder.scan_module(b_module_data, fqn=fqc_name)
+
+ # For ps_argspec.ps1 to compile Ansible.Basic it also needs the AddType module_util.
+ ps_dep_finder._add_module(name=b"Ansible.ModuleUtils.AddType", ext=".psm1", fqn=None, optional=False, wrapper=False)
+
+ util_manifest = json.dumps({
+ 'module_path': to_text(module_path, errors='surrogate_or_strict'),
+ 'ansible_basic': ps_dep_finder.cs_utils_module["Ansible.Basic"]['path'],
+ 'ps_utils': {name: info['path'] for name, info in ps_dep_finder.ps_modules.items()}
+ })
+
+ script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'ps_argspec.ps1')
+ proc = subprocess.run(['pwsh', script_path, util_manifest], stdin=subprocess.DEVNULL, capture_output=True, text=True, check=False)
+
+ if proc.returncode != 0:
+ raise AnsibleModuleImportError("STDOUT:\n%s\nSTDERR:\n%s" % (proc.stdout, proc.stderr))
+
+ kwargs = json.loads(proc.stdout)
+
+ # the validate-modules code expects the options spec to be under the argument_spec key not options as set in PS
+ kwargs['argument_spec'] = kwargs.pop('options', {})
+
+ return kwargs['argument_spec'], kwargs
+
+
+def get_py_argument_spec(filename, collection):
+ name = get_module_name_from_filename(filename, collection)
+
+ with setup_env(filename) as fake:
+ try:
+ with CaptureStd():
+ runpy.run_module(name, run_name='__main__', alter_sys=True)
+ except AnsibleModuleCallError:
+ pass
+ except BaseException as e:
+ # we want to catch all exceptions here, including sys.exit
+ reraise(AnsibleModuleImportError, AnsibleModuleImportError('%s' % e), sys.exc_info()[2])
+
+ if not fake.called:
+ raise AnsibleModuleNotInitialized()
+
+ try:
+ # Convert positional arguments to kwargs to make sure that all parameters are actually checked
+ for arg, arg_name in zip(fake.args, ANSIBLE_MODULE_CONSTRUCTOR_ARGS):
+ fake.kwargs[arg_name] = arg
+ # for ping kwargs == {'argument_spec':{'data':{'type':'str','default':'pong'}}, 'supports_check_mode':True}
+ argument_spec = fake.kwargs.get('argument_spec') or {}
+ # If add_file_common_args is truish, add options from FILE_COMMON_ARGUMENTS when not present.
+ # This is the only modification to argument_spec done by AnsibleModule itself, and which is
+ # not caught by setup_env's AnsibleModule replacement
+ if fake.kwargs.get('add_file_common_args'):
+ for k, v in FILE_COMMON_ARGUMENTS.items():
+ if k not in argument_spec:
+ argument_spec[k] = v
+ return argument_spec, fake.kwargs
+ except (TypeError, IndexError):
+ return {}, {}
+
+
+def get_argument_spec(filename, collection):
+ if filename.endswith('.py'):
+ return get_py_argument_spec(filename, collection)
+ else:
+ return get_ps_argument_spec(filename, collection)
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/ps_argspec.ps1 b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/ps_argspec.ps1
new file mode 100644
index 0000000..4183b2b
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/ps_argspec.ps1
@@ -0,0 +1,121 @@
+#Requires -Version 6
+
+Set-StrictMode -Version 2.0
+$ErrorActionPreference = "Stop"
+$WarningPreference = "Stop"
+
+Function Resolve-CircularReference {
+ <#
+ .SYNOPSIS
+ Removes known types that cause a circular reference in their json serialization.
+
+ .PARAMETER Hash
+ The hash to scan for circular references
+ #>
+ [CmdletBinding()]
+ param (
+ [Parameter(Mandatory = $true)]
+ [System.Collections.IDictionary]
+ $Hash
+ )
+
+ foreach ($key in [String[]]$Hash.Keys) {
+ $value = $Hash[$key]
+ if ($value -is [System.Collections.IDictionary]) {
+ Resolve-CircularReference -Hash $value
+ }
+ elseif ($value -is [Array] -or $value -is [System.Collections.IList]) {
+ $values = @(foreach ($v in $value) {
+ if ($v -is [System.Collections.IDictionary]) {
+ Resolve-CircularReference -Hash $v
+ }
+ , $v
+ })
+ $Hash[$key] = $values
+ }
+ elseif ($value -is [DateTime]) {
+ $Hash[$key] = $value.ToString("yyyy-MM-dd")
+ }
+ elseif ($value -is [delegate]) {
+ # Type can be set to a delegate function which defines it's own type. For the documentation we just
+ # reflection that as raw
+ if ($key -eq 'type') {
+ $Hash[$key] = 'raw'
+ }
+ else {
+ $Hash[$key] = $value.ToString() # Shouldn't ever happen but just in case.
+ }
+ }
+ }
+}
+
+$manifest = ConvertFrom-Json -InputObject $args[0] -AsHashtable
+if (-not $manifest.Contains('module_path') -or -not $manifest.module_path) {
+ Write-Error -Message "No module specified."
+ exit 1
+}
+$module_path = $manifest.module_path
+
+# Check if the path is relative and get the full path to the module
+if (-not ([System.IO.Path]::IsPathRooted($module_path))) {
+ $module_path = $ExecutionContext.SessionState.Path.GetUnresolvedProviderPathFromPSPath($module_path)
+}
+
+if (-not (Test-Path -LiteralPath $module_path -PathType Leaf)) {
+ Write-Error -Message "The module at '$module_path' does not exist."
+ exit 1
+}
+
+$module_code = Get-Content -LiteralPath $module_path -Raw
+
+$powershell = [PowerShell]::Create()
+$powershell.Runspace.SessionStateProxy.SetVariable("ErrorActionPreference", "Stop")
+
+# Load the PowerShell module utils as the module may be using them to refer to shared module options. Currently we
+# can only load the PowerShell utils due to cross platform compatibility issues.
+if ($manifest.Contains('ps_utils')) {
+ foreach ($util_info in $manifest.ps_utils.GetEnumerator()) {
+ $util_name = $util_info.Key
+ $util_path = $util_info.Value
+
+ if (-not (Test-Path -LiteralPath $util_path -PathType Leaf)) {
+ # Failed to find the util path, just silently ignore for now and hope for the best.
+ continue
+ }
+
+ $util_sb = [ScriptBlock]::Create((Get-Content -LiteralPath $util_path -Raw))
+ $powershell.AddCommand('New-Module').AddParameters(@{
+ Name = $util_name
+ ScriptBlock = $util_sb
+ }) > $null
+ $powershell.AddCommand('Import-Module').AddParameter('WarningAction', 'SilentlyContinue') > $null
+ $powershell.AddCommand('Out-Null').AddStatement() > $null
+
+ # Also import it into the current runspace in case ps_argspec.ps1 needs to use it.
+ $null = New-Module -Name $util_name -ScriptBlock $util_sb | Import-Module -WarningAction SilentlyContinue
+ }
+}
+
+Add-CSharpType -References @(Get-Content -LiteralPath $manifest.ansible_basic -Raw)
+[Ansible.Basic.AnsibleModule]::_DebugArgSpec = $true
+
+$powershell.AddScript($module_code) > $null
+$powershell.Invoke() > $null
+$arg_spec = $powershell.Runspace.SessionStateProxy.GetVariable('ansibleTestArgSpec')
+
+if (-not $arg_spec) {
+ $err = $powershell.Streams.Error
+ if ($err) {
+ $err
+ }
+ else {
+ "Unknown error trying to get PowerShell arg spec"
+ }
+
+ exit 1
+}
+
+
+Resolve-CircularReference -Hash $arg_spec
+
+ConvertTo-Json -InputObject $arg_spec -Compress -Depth 99
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/schema.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/schema.py
new file mode 100644
index 0000000..b2623ff
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/schema.py
@@ -0,0 +1,899 @@
+# -*- coding: utf-8 -*-
+
+# Copyright: (c) 2015, Matt Martz <matt@sivel.net>
+# Copyright: (c) 2015, Rackspace US, Inc.
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
+
+import re
+
+from ansible.module_utils.compat.version import StrictVersion
+from functools import partial
+from urllib.parse import urlparse
+
+from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, All, Any, Invalid, Length, Required, Schema, Self, ValueInvalid, Exclusive
+from ansible.module_utils.six import string_types
+from ansible.module_utils.common.collections import is_iterable
+from ansible.module_utils.parsing.convert_bool import boolean
+from ansible.parsing.quoting import unquote
+from ansible.utils.version import SemanticVersion
+from ansible.release import __version__
+
+from .utils import parse_isodate
+
+list_string_types = list(string_types)
+tuple_string_types = tuple(string_types)
+any_string_types = Any(*string_types)
+
+# Valid DOCUMENTATION.author lines
+# Based on Ansibulbot's extract_github_id()
+# author: First Last (@name) [optional anything]
+# "Ansible Core Team" - Used by the Bot
+# "Michael DeHaan" - nop
+# "OpenStack Ansible SIG" - OpenStack does not use GitHub
+# "Name (!UNKNOWN)" - For the few untraceable authors
+author_line = re.compile(r'^\w.*(\(@([\w-]+)\)|!UNKNOWN)(?![\w.])|^Ansible Core Team$|^Michael DeHaan$|^OpenStack Ansible SIG$')
+
+
+def _add_ansible_error_code(exception, error_code):
+ setattr(exception, 'ansible_error_code', error_code)
+ return exception
+
+
+def isodate(v, error_code=None):
+ try:
+ parse_isodate(v, allow_date=True)
+ except ValueError as e:
+ raise _add_ansible_error_code(Invalid(str(e)), error_code or 'ansible-invalid-date')
+ return v
+
+
+COLLECTION_NAME_RE = re.compile(r'^\w+(?:\.\w+)+$')
+FULLY_QUALIFIED_COLLECTION_RESOURCE_RE = re.compile(r'^\w+(?:\.\w+){2,}$')
+
+
+def collection_name(v, error_code=None):
+ if not isinstance(v, string_types):
+ raise _add_ansible_error_code(
+ Invalid('Collection name must be a string'), error_code or 'collection-invalid-name')
+ m = COLLECTION_NAME_RE.match(v)
+ if not m:
+ raise _add_ansible_error_code(
+ Invalid('Collection name must be of format `<namespace>.<name>`'), error_code or 'collection-invalid-name')
+ return v
+
+
+def deprecation_versions():
+ """Create a list of valid version for deprecation entries, current+4"""
+ major, minor = [int(version) for version in __version__.split('.')[0:2]]
+ return Any(*['{0}.{1}'.format(major, minor + increment) for increment in range(0, 5)])
+
+
+def version(for_collection=False):
+ if for_collection:
+ # We do not accept floats for versions in collections
+ return Any(*string_types)
+ return Any(float, *string_types)
+
+
+def date(error_code=None):
+ return Any(isodate, error_code=error_code)
+
+
+_MODULE = re.compile(r"\bM\(([^)]+)\)")
+_LINK = re.compile(r"\bL\(([^)]+)\)")
+_URL = re.compile(r"\bU\(([^)]+)\)")
+_REF = re.compile(r"\bR\(([^)]+)\)")
+
+
+def _check_module_link(directive, content):
+ if not FULLY_QUALIFIED_COLLECTION_RESOURCE_RE.match(content):
+ raise _add_ansible_error_code(
+ Invalid('Directive "%s" must contain a FQCN' % directive), 'invalid-documentation-markup')
+
+
+def _check_link(directive, content):
+ if ',' not in content:
+ raise _add_ansible_error_code(
+ Invalid('Directive "%s" must contain a comma' % directive), 'invalid-documentation-markup')
+ idx = content.rindex(',')
+ title = content[:idx]
+ url = content[idx + 1:].lstrip(' ')
+ _check_url(directive, url)
+
+
+def _check_url(directive, content):
+ try:
+ parsed_url = urlparse(content)
+ if parsed_url.scheme not in ('', 'http', 'https'):
+ raise ValueError('Schema must be HTTP, HTTPS, or not specified')
+ except ValueError as exc:
+ raise _add_ansible_error_code(
+ Invalid('Directive "%s" must contain an URL' % directive), 'invalid-documentation-markup')
+
+
+def _check_ref(directive, content):
+ if ',' not in content:
+ raise _add_ansible_error_code(
+ Invalid('Directive "%s" must contain a comma' % directive), 'invalid-documentation-markup')
+
+
+def doc_string(v):
+ """Match a documentation string."""
+ if not isinstance(v, string_types):
+ raise _add_ansible_error_code(
+ Invalid('Must be a string'), 'invalid-documentation')
+ for m in _MODULE.finditer(v):
+ _check_module_link(m.group(0), m.group(1))
+ for m in _LINK.finditer(v):
+ _check_link(m.group(0), m.group(1))
+ for m in _URL.finditer(v):
+ _check_url(m.group(0), m.group(1))
+ for m in _REF.finditer(v):
+ _check_ref(m.group(0), m.group(1))
+ return v
+
+
+def doc_string_or_strings(v):
+ """Match a documentation string, or list of strings."""
+ if isinstance(v, string_types):
+ return doc_string(v)
+ if isinstance(v, (list, tuple)):
+ return [doc_string(vv) for vv in v]
+ raise _add_ansible_error_code(
+ Invalid('Must be a string or list of strings'), 'invalid-documentation')
+
+
+def is_callable(v):
+ if not callable(v):
+ raise ValueInvalid('not a valid value')
+ return v
+
+
+def sequence_of_sequences(min=None, max=None):
+ return All(
+ Any(
+ None,
+ [Any(list, tuple)],
+ tuple([Any(list, tuple)]),
+ ),
+ Any(
+ None,
+ [Length(min=min, max=max)],
+ tuple([Length(min=min, max=max)]),
+ ),
+ )
+
+
+seealso_schema = Schema(
+ [
+ Any(
+ {
+ Required('module'): Any(*string_types),
+ 'description': doc_string,
+ },
+ {
+ Required('ref'): Any(*string_types),
+ Required('description'): doc_string,
+ },
+ {
+ Required('name'): Any(*string_types),
+ Required('link'): Any(*string_types),
+ Required('description'): doc_string,
+ },
+ ),
+ ]
+)
+
+
+argument_spec_types = ['bits', 'bool', 'bytes', 'dict', 'float', 'int', 'json', 'jsonarg', 'list', 'path', 'raw',
+ 'sid', 'str']
+
+
+argument_spec_modifiers = {
+ 'mutually_exclusive': sequence_of_sequences(min=2),
+ 'required_together': sequence_of_sequences(min=2),
+ 'required_one_of': sequence_of_sequences(min=2),
+ 'required_if': sequence_of_sequences(min=3, max=4),
+ 'required_by': Schema({str: Any(list_string_types, tuple_string_types, *string_types)}),
+}
+
+
+def no_required_with_default(v):
+ if v.get('default') and v.get('required'):
+ raise Invalid('required=True cannot be supplied with a default')
+ return v
+
+
+def elements_with_list(v):
+ if v.get('elements') and v.get('type') != 'list':
+ raise Invalid('type must be list to use elements')
+ return v
+
+
+def options_with_apply_defaults(v):
+ if v.get('apply_defaults') and not v.get('options'):
+ raise Invalid('apply_defaults=True requires options to be set')
+ return v
+
+
+def check_removal_version(v, version_field, collection_name_field, error_code='invalid-removal-version'):
+ version = v.get(version_field)
+ collection_name = v.get(collection_name_field)
+ if not isinstance(version, string_types) or not isinstance(collection_name, string_types):
+ # If they are not strings, schema validation will have already complained.
+ return v
+ if collection_name == 'ansible.builtin':
+ try:
+ parsed_version = StrictVersion()
+ parsed_version.parse(version)
+ except ValueError as exc:
+ raise _add_ansible_error_code(
+ Invalid('%s (%r) is not a valid ansible-core version: %s' % (version_field, version, exc)),
+ error_code=error_code)
+ return v
+ try:
+ parsed_version = SemanticVersion()
+ parsed_version.parse(version)
+ if parsed_version.major != 0 and (parsed_version.minor != 0 or parsed_version.patch != 0):
+ raise _add_ansible_error_code(
+ Invalid('%s (%r) must be a major release, not a minor or patch release (see specification at '
+ 'https://semver.org/)' % (version_field, version)),
+ error_code='removal-version-must-be-major')
+ except ValueError as exc:
+ raise _add_ansible_error_code(
+ Invalid('%s (%r) is not a valid collection version (see specification at https://semver.org/): '
+ '%s' % (version_field, version, exc)),
+ error_code=error_code)
+ return v
+
+
+def option_deprecation(v):
+ if v.get('removed_in_version') or v.get('removed_at_date'):
+ if v.get('removed_in_version') and v.get('removed_at_date'):
+ raise _add_ansible_error_code(
+ Invalid('Only one of removed_in_version and removed_at_date must be specified'),
+ error_code='deprecation-either-date-or-version')
+ if not v.get('removed_from_collection'):
+ raise _add_ansible_error_code(
+ Invalid('If removed_in_version or removed_at_date is specified, '
+ 'removed_from_collection must be specified as well'),
+ error_code='deprecation-collection-missing')
+ check_removal_version(v,
+ version_field='removed_in_version',
+ collection_name_field='removed_from_collection',
+ error_code='invalid-removal-version')
+ return
+ if v.get('removed_from_collection'):
+ raise Invalid('removed_from_collection cannot be specified without either '
+ 'removed_in_version or removed_at_date')
+
+
+def argument_spec_schema(for_collection):
+ any_string_types = Any(*string_types)
+ schema = {
+ any_string_types: {
+ 'type': Any(is_callable, *argument_spec_types),
+ 'elements': Any(*argument_spec_types),
+ 'default': object,
+ 'fallback': Any(
+ (is_callable, list_string_types),
+ [is_callable, list_string_types],
+ ),
+ 'choices': Any([object], (object,)),
+ 'required': bool,
+ 'no_log': bool,
+ 'aliases': Any(list_string_types, tuple(list_string_types)),
+ 'apply_defaults': bool,
+ 'removed_in_version': version(for_collection),
+ 'removed_at_date': date(),
+ 'removed_from_collection': collection_name,
+ 'options': Self,
+ 'deprecated_aliases': Any([All(
+ Any(
+ {
+ Required('name'): Any(*string_types),
+ Required('date'): date(),
+ Required('collection_name'): collection_name,
+ },
+ {
+ Required('name'): Any(*string_types),
+ Required('version'): version(for_collection),
+ Required('collection_name'): collection_name,
+ },
+ ),
+ partial(check_removal_version,
+ version_field='version',
+ collection_name_field='collection_name',
+ error_code='invalid-removal-version')
+ )]),
+ }
+ }
+ schema[any_string_types].update(argument_spec_modifiers)
+ schemas = All(
+ schema,
+ Schema({any_string_types: no_required_with_default}),
+ Schema({any_string_types: elements_with_list}),
+ Schema({any_string_types: options_with_apply_defaults}),
+ Schema({any_string_types: option_deprecation}),
+ )
+ return Schema(schemas)
+
+
+def ansible_module_kwargs_schema(module_name, for_collection):
+ schema = {
+ 'argument_spec': argument_spec_schema(for_collection),
+ 'bypass_checks': bool,
+ 'no_log': bool,
+ 'check_invalid_arguments': Any(None, bool),
+ 'add_file_common_args': bool,
+ 'supports_check_mode': bool,
+ }
+ if module_name.endswith(('_info', '_facts')):
+ del schema['supports_check_mode']
+ schema[Required('supports_check_mode')] = True
+ schema.update(argument_spec_modifiers)
+ return Schema(schema)
+
+
+json_value = Schema(Any(
+ None,
+ int,
+ float,
+ [Self],
+ *(list({str_type: Self} for str_type in string_types) + list(string_types))
+))
+
+
+def version_added(v, error_code='version-added-invalid', accept_historical=False):
+ if 'version_added' in v:
+ version_added = v.get('version_added')
+ if isinstance(version_added, string_types):
+ # If it is not a string, schema validation will have already complained
+ # - or we have a float and we are in ansible/ansible, in which case we're
+ # also happy.
+ if v.get('version_added_collection') == 'ansible.builtin':
+ if version_added == 'historical' and accept_historical:
+ return v
+ try:
+ version = StrictVersion()
+ version.parse(version_added)
+ except ValueError as exc:
+ raise _add_ansible_error_code(
+ Invalid('version_added (%r) is not a valid ansible-core version: '
+ '%s' % (version_added, exc)),
+ error_code=error_code)
+ else:
+ try:
+ version = SemanticVersion()
+ version.parse(version_added)
+ if version.major != 0 and version.patch != 0:
+ raise _add_ansible_error_code(
+ Invalid('version_added (%r) must be a major or minor release, '
+ 'not a patch release (see specification at '
+ 'https://semver.org/)' % (version_added, )),
+ error_code='version-added-must-be-major-or-minor')
+ except ValueError as exc:
+ raise _add_ansible_error_code(
+ Invalid('version_added (%r) is not a valid collection version '
+ '(see specification at https://semver.org/): '
+ '%s' % (version_added, exc)),
+ error_code=error_code)
+ elif 'version_added_collection' in v:
+ # Must have been manual intervention, since version_added_collection is only
+ # added automatically when version_added is present
+ raise Invalid('version_added_collection cannot be specified without version_added')
+ return v
+
+
+def check_option_elements(v):
+ # Check whether elements is there iff type == 'list'
+ v_type = v.get('type')
+ v_elements = v.get('elements')
+ if v_type == 'list' and v_elements is None:
+ raise _add_ansible_error_code(
+ Invalid('Argument defines type as list but elements is not defined'),
+ error_code='parameter-list-no-elements') # FIXME: adjust error code?
+ if v_type != 'list' and v_elements is not None:
+ raise _add_ansible_error_code(
+ Invalid('Argument defines parameter elements as %s but it is valid only when value of parameter type is list' % (v_elements, )),
+ error_code='doc-elements-invalid')
+ return v
+
+
+def get_type_checker(v):
+ v_type = v.get('type')
+ if v_type == 'list':
+ elt_checker, elt_name = get_type_checker({'type': v.get('elements')})
+
+ def list_checker(value):
+ if isinstance(value, string_types):
+ value = [unquote(x.strip()) for x in value.split(',')]
+ if not isinstance(value, list):
+ raise ValueError('Value must be a list')
+ if elt_checker:
+ for elt in value:
+ try:
+ elt_checker(elt)
+ except Exception as exc:
+ raise ValueError('Entry %r is not of type %s: %s' % (elt, elt_name, exc))
+
+ return list_checker, ('list of %s' % elt_name) if elt_checker else 'list'
+
+ if v_type in ('boolean', 'bool'):
+ return partial(boolean, strict=False), v_type
+
+ if v_type in ('integer', 'int'):
+ return int, v_type
+
+ if v_type == 'float':
+ return float, v_type
+
+ if v_type == 'none':
+ def none_checker(value):
+ if value not in ('None', None):
+ raise ValueError('Value must be "None" or none')
+
+ return none_checker, v_type
+
+ if v_type in ('str', 'string', 'path', 'tmp', 'temppath', 'tmppath'):
+ def str_checker(value):
+ if not isinstance(value, string_types):
+ raise ValueError('Value must be string')
+
+ return str_checker, v_type
+
+ if v_type in ('pathspec', 'pathlist'):
+ def path_list_checker(value):
+ if not isinstance(value, string_types) and not is_iterable(value):
+ raise ValueError('Value must be string or list of strings')
+
+ return path_list_checker, v_type
+
+ if v_type in ('dict', 'dictionary'):
+ def dict_checker(value):
+ if not isinstance(value, dict):
+ raise ValueError('Value must be dictionary')
+
+ return dict_checker, v_type
+
+ return None, 'unknown'
+
+
+def check_option_choices(v):
+ # Check whether choices have the correct type
+ v_choices = v.get('choices')
+ if not is_iterable(v_choices):
+ return v
+
+ if v.get('type') == 'list':
+ # choices for a list type means that every list element must be one of these choices
+ type_checker, type_name = get_type_checker({'type': v.get('elements')})
+ else:
+ type_checker, type_name = get_type_checker(v)
+ if type_checker is None:
+ return v
+
+ for value in v_choices:
+ try:
+ type_checker(value)
+ except Exception as exc:
+ raise _add_ansible_error_code(
+ Invalid(
+ 'Argument defines choices as (%r) but this is incompatible with argument type %s: %s' % (value, type_name, exc)),
+ error_code='doc-choices-incompatible-type')
+
+ return v
+
+
+def check_option_default(v):
+ # Check whether default is only present if required=False, and whether default has correct type
+ v_default = v.get('default')
+ if v.get('required') and v_default is not None:
+ raise _add_ansible_error_code(
+ Invalid(
+ 'Argument is marked as required but specifies a default.'
+ ' Arguments with a default should not be marked as required'),
+ error_code='no-default-for-required-parameter') # FIXME: adjust error code?
+
+ if v_default is None:
+ return v
+
+ type_checker, type_name = get_type_checker(v)
+ if type_checker is None:
+ return v
+
+ try:
+ type_checker(v_default)
+ except Exception as exc:
+ raise _add_ansible_error_code(
+ Invalid(
+ 'Argument defines default as (%r) but this is incompatible with parameter type %s: %s' % (v_default, type_name, exc)),
+ error_code='incompatible-default-type')
+
+ return v
+
+
+def list_dict_option_schema(for_collection, plugin_type):
+ if plugin_type == 'module':
+ option_types = Any(None, 'bits', 'bool', 'bytes', 'dict', 'float', 'int', 'json', 'jsonarg', 'list', 'path', 'raw', 'sid', 'str')
+ element_types = option_types
+ else:
+ option_types = Any(None, 'boolean', 'bool', 'integer', 'int', 'float', 'list', 'dict', 'dictionary', 'none',
+ 'path', 'tmp', 'temppath', 'tmppath', 'pathspec', 'pathlist', 'str', 'string', 'raw')
+ element_types = Any(None, 'boolean', 'bool', 'integer', 'int', 'float', 'list', 'dict', 'dictionary', 'path', 'str', 'string', 'raw')
+
+ basic_option_schema = {
+ Required('description'): doc_string_or_strings,
+ 'required': bool,
+ 'choices': list,
+ 'aliases': Any(list_string_types),
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ 'default': json_value,
+ # Note: Types are strings, not literal bools, such as True or False
+ 'type': option_types,
+ # in case of type='list' elements define type of individual item in list
+ 'elements': element_types,
+ }
+ if plugin_type != 'module':
+ basic_option_schema['name'] = Any(*string_types)
+ deprecated_schema = All(
+ Schema(
+ All(
+ {
+ # This definition makes sure everything has the correct types/values
+ 'why': doc_string,
+ 'alternatives': doc_string,
+ # vod stands for 'version or date'; this is the name of the exclusive group
+ Exclusive('removed_at_date', 'vod'): date(),
+ Exclusive('version', 'vod'): version(for_collection),
+ 'collection_name': collection_name,
+ },
+ {
+ # This definition makes sure that everything we require is there
+ Required('why'): Any(*string_types),
+ 'alternatives': Any(*string_types),
+ Required(Any('removed_at_date', 'version')): Any(*string_types),
+ Required('collection_name'): Any(*string_types),
+ },
+ ),
+ extra=PREVENT_EXTRA
+ ),
+ partial(check_removal_version,
+ version_field='version',
+ collection_name_field='collection_name',
+ error_code='invalid-removal-version'),
+ )
+ env_schema = All(
+ Schema({
+ Required('name'): Any(*string_types),
+ 'deprecated': deprecated_schema,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ }, extra=PREVENT_EXTRA),
+ partial(version_added, error_code='option-invalid-version-added')
+ )
+ ini_schema = All(
+ Schema({
+ Required('key'): Any(*string_types),
+ Required('section'): Any(*string_types),
+ 'deprecated': deprecated_schema,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ }, extra=PREVENT_EXTRA),
+ partial(version_added, error_code='option-invalid-version-added')
+ )
+ vars_schema = All(
+ Schema({
+ Required('name'): Any(*string_types),
+ 'deprecated': deprecated_schema,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ }, extra=PREVENT_EXTRA),
+ partial(version_added, error_code='option-invalid-version-added')
+ )
+ cli_schema = All(
+ Schema({
+ Required('name'): Any(*string_types),
+ 'option': Any(*string_types),
+ 'deprecated': deprecated_schema,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ }, extra=PREVENT_EXTRA),
+ partial(version_added, error_code='option-invalid-version-added')
+ )
+ keyword_schema = All(
+ Schema({
+ Required('name'): Any(*string_types),
+ 'deprecated': deprecated_schema,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ }, extra=PREVENT_EXTRA),
+ partial(version_added, error_code='option-invalid-version-added')
+ )
+ basic_option_schema.update({
+ 'env': [env_schema],
+ 'ini': [ini_schema],
+ 'vars': [vars_schema],
+ 'cli': [cli_schema],
+ 'keyword': [keyword_schema],
+ 'deprecated': deprecated_schema,
+ })
+
+ suboption_schema = dict(basic_option_schema)
+ suboption_schema.update({
+ # Recursive suboptions
+ 'suboptions': Any(None, *list({str_type: Self} for str_type in string_types)),
+ })
+ suboption_schema = Schema(All(
+ suboption_schema,
+ check_option_elements,
+ check_option_choices,
+ check_option_default,
+ ), extra=PREVENT_EXTRA)
+
+ # This generates list of dicts with keys from string_types and suboption_schema value
+ # for example in Python 3: {str: suboption_schema}
+ list_dict_suboption_schema = [{str_type: suboption_schema} for str_type in string_types]
+
+ option_schema = dict(basic_option_schema)
+ option_schema.update({
+ 'suboptions': Any(None, *list_dict_suboption_schema),
+ })
+ option_schema = Schema(All(
+ option_schema,
+ check_option_elements,
+ check_option_choices,
+ check_option_default,
+ ), extra=PREVENT_EXTRA)
+
+ option_version_added = Schema(
+ All({
+ 'suboptions': Any(None, *[{str_type: Self} for str_type in string_types]),
+ }, partial(version_added, error_code='option-invalid-version-added')),
+ extra=ALLOW_EXTRA
+ )
+
+ # This generates list of dicts with keys from string_types and option_schema value
+ # for example in Python 3: {str: option_schema}
+ return [{str_type: All(option_schema, option_version_added)} for str_type in string_types]
+
+
+def return_contains(v):
+ schema = Schema(
+ {
+ Required('contains'): Any(dict, list, *string_types)
+ },
+ extra=ALLOW_EXTRA
+ )
+ if v.get('type') == 'complex':
+ return schema(v)
+ return v
+
+
+def return_schema(for_collection, plugin_type='module'):
+ if plugin_type == 'module':
+ return_types = Any('bool', 'complex', 'dict', 'float', 'int', 'list', 'raw', 'str')
+ element_types = Any(None, 'bits', 'bool', 'bytes', 'dict', 'float', 'int', 'json', 'jsonarg', 'list', 'path', 'raw', 'sid', 'str')
+ else:
+ return_types = Any(None, 'boolean', 'bool', 'integer', 'int', 'float', 'list', 'dict', 'dictionary', 'path', 'str', 'string', 'raw')
+ element_types = return_types
+
+ basic_return_option_schema = {
+ Required('description'): doc_string_or_strings,
+ 'returned': doc_string,
+ 'version_added': version(for_collection),
+ 'version_added_collection': collection_name,
+ 'sample': json_value,
+ 'example': json_value,
+ # in case of type='list' elements define type of individual item in list
+ 'elements': element_types,
+ 'choices': Any([object], (object,)),
+ }
+ if plugin_type == 'module':
+ # type is only required for modules right now
+ basic_return_option_schema[Required('type')] = return_types
+ else:
+ basic_return_option_schema['type'] = return_types
+
+ inner_return_option_schema = dict(basic_return_option_schema)
+ inner_return_option_schema.update({
+ 'contains': Any(None, *list({str_type: Self} for str_type in string_types)),
+ })
+ return_contains_schema = Any(
+ All(
+ Schema(inner_return_option_schema),
+ Schema(return_contains),
+ Schema(partial(version_added, error_code='option-invalid-version-added')),
+ ),
+ Schema(type(None)),
+ )
+
+ # This generates list of dicts with keys from string_types and return_contains_schema value
+ # for example in Python 3: {str: return_contains_schema}
+ list_dict_return_contains_schema = [{str_type: return_contains_schema} for str_type in string_types]
+
+ return_option_schema = dict(basic_return_option_schema)
+ return_option_schema.update({
+ 'contains': Any(None, *list_dict_return_contains_schema),
+ })
+ if plugin_type == 'module':
+ # 'returned' is required on top-level
+ del return_option_schema['returned']
+ return_option_schema[Required('returned')] = Any(*string_types)
+ return Any(
+ All(
+ Schema(
+ {
+ any_string_types: return_option_schema
+ }
+ ),
+ Schema({any_string_types: return_contains}),
+ Schema({any_string_types: partial(version_added, error_code='option-invalid-version-added')}),
+ ),
+ Schema(type(None)),
+ )
+
+
+def deprecation_schema(for_collection):
+ main_fields = {
+ Required('why'): doc_string,
+ Required('alternative'): doc_string,
+ Required('removed_from_collection'): collection_name,
+ 'removed': Any(True),
+ }
+
+ date_schema = {
+ Required('removed_at_date'): date(),
+ }
+ date_schema.update(main_fields)
+
+ if for_collection:
+ version_schema = {
+ Required('removed_in'): version(for_collection),
+ }
+ else:
+ version_schema = {
+ Required('removed_in'): deprecation_versions(),
+ }
+ version_schema.update(main_fields)
+
+ result = Any(
+ Schema(version_schema, extra=PREVENT_EXTRA),
+ Schema(date_schema, extra=PREVENT_EXTRA),
+ )
+
+ if for_collection:
+ result = All(
+ result,
+ partial(check_removal_version,
+ version_field='removed_in',
+ collection_name_field='removed_from_collection',
+ error_code='invalid-removal-version'))
+
+ return result
+
+
+def author(value):
+ if value is None:
+ return value # let schema checks handle
+
+ if not is_iterable(value):
+ value = [value]
+
+ for line in value:
+ if not isinstance(line, string_types):
+ continue # let schema checks handle
+ m = author_line.search(line)
+ if not m:
+ raise Invalid("Invalid author")
+
+ return value
+
+
+def doc_schema(module_name, for_collection=False, deprecated_module=False, plugin_type='module'):
+
+ if module_name.startswith('_'):
+ module_name = module_name[1:]
+ deprecated_module = True
+ if for_collection is False and plugin_type == 'connection' and module_name == 'paramiko_ssh':
+ # The plugin loader has a hard-coded exception: when the builtin connection 'paramiko' is
+ # referenced, it loads 'paramiko_ssh' instead. That's why in this plugin, the name must be
+ # 'paramiko' and not 'paramiko_ssh'.
+ module_name = 'paramiko'
+ doc_schema_dict = {
+ Required('module' if plugin_type == 'module' else 'name'): module_name,
+ Required('short_description'): doc_string,
+ Required('description'): doc_string_or_strings,
+ 'notes': Any(None, [doc_string]),
+ 'seealso': Any(None, seealso_schema),
+ 'requirements': [doc_string],
+ 'todo': Any(None, doc_string_or_strings),
+ 'options': Any(None, *list_dict_option_schema(for_collection, plugin_type)),
+ 'extends_documentation_fragment': Any(list_string_types, *string_types),
+ 'version_added_collection': collection_name,
+ }
+ if plugin_type == 'module':
+ doc_schema_dict[Required('author')] = All(Any(None, list_string_types, *string_types), author)
+ else:
+ # author is optional for plugins (for now)
+ doc_schema_dict['author'] = All(Any(None, list_string_types, *string_types), author)
+ if plugin_type == 'callback':
+ doc_schema_dict[Required('type')] = Any('aggregate', 'notification', 'stdout')
+
+ if for_collection:
+ # Optional
+ doc_schema_dict['version_added'] = version(for_collection=True)
+ else:
+ doc_schema_dict[Required('version_added')] = version(for_collection=False)
+
+ if deprecated_module:
+ deprecation_required_scheme = {
+ Required('deprecated'): Any(deprecation_schema(for_collection=for_collection)),
+ }
+
+ doc_schema_dict.update(deprecation_required_scheme)
+
+ def add_default_attributes(more=None):
+ schema = {
+ 'description': doc_string_or_strings,
+ 'details': doc_string_or_strings,
+ 'support': any_string_types,
+ 'version_added_collection': any_string_types,
+ 'version_added': any_string_types,
+ }
+ if more:
+ schema.update(more)
+ return schema
+
+ doc_schema_dict['attributes'] = Schema(
+ All(
+ Schema({
+ any_string_types: {
+ Required('description'): doc_string_or_strings,
+ Required('support'): Any('full', 'partial', 'none', 'N/A'),
+ 'details': doc_string_or_strings,
+ 'version_added_collection': collection_name,
+ 'version_added': version(for_collection=for_collection),
+ },
+ }, extra=ALLOW_EXTRA),
+ partial(version_added, error_code='attribute-invalid-version-added', accept_historical=False),
+ Schema({
+ any_string_types: add_default_attributes(),
+ 'action_group': add_default_attributes({
+ Required('membership'): list_string_types,
+ }),
+ 'forced_action_plugin': add_default_attributes({
+ Required('action_plugin'): any_string_types,
+ }),
+ 'platform': add_default_attributes({
+ Required('platforms'): Any(list_string_types, *string_types)
+ }),
+ }, extra=PREVENT_EXTRA),
+ )
+ )
+ return Schema(
+ All(
+ Schema(
+ doc_schema_dict,
+ extra=PREVENT_EXTRA
+ ),
+ partial(version_added, error_code='module-invalid-version-added', accept_historical=not for_collection),
+ )
+ )
+
+
+# Things to add soon
+####################
+# 1) Recursively validate `type: complex` fields
+# This will improve documentation, though require fair amount of module tidyup
+
+# Possible Future Enhancements
+##############################
+
+# 1) Don't allow empty options for choices, aliases, etc
+# 2) If type: bool ensure choices isn't set - perhaps use Exclusive
+# 3) both version_added should be quoted floats
+
+# Tool that takes JSON and generates RETURN skeleton (needs to support complex structures)
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/utils.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/utils.py
new file mode 100644
index 0000000..88d5b01
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/utils.py
@@ -0,0 +1,222 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 Matt Martz <matt@sivel.net>
+# Copyright (C) 2015 Rackspace US, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import annotations
+
+import ast
+import datetime
+import os
+import re
+import sys
+
+from io import BytesIO, TextIOWrapper
+
+import yaml
+import yaml.reader
+
+from ansible.module_utils._text import to_text
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.common.yaml import SafeLoader
+from ansible.module_utils.six import string_types
+from ansible.parsing.yaml.loader import AnsibleLoader
+
+
+class AnsibleTextIOWrapper(TextIOWrapper):
+ def write(self, s):
+ super(AnsibleTextIOWrapper, self).write(to_text(s, self.encoding, errors='replace'))
+
+
+def find_executable(executable, cwd=None, path=None):
+ """Finds the full path to the executable specified"""
+ match = None
+ real_cwd = os.getcwd()
+
+ if not cwd:
+ cwd = real_cwd
+
+ if os.path.dirname(executable):
+ target = os.path.join(cwd, executable)
+ if os.path.exists(target) and os.access(target, os.F_OK | os.X_OK):
+ match = executable
+ else:
+ path = os.environ.get('PATH', os.path.defpath)
+
+ path_dirs = path.split(os.path.pathsep)
+ seen_dirs = set()
+
+ for path_dir in path_dirs:
+ if path_dir in seen_dirs:
+ continue
+
+ seen_dirs.add(path_dir)
+
+ if os.path.abspath(path_dir) == real_cwd:
+ path_dir = cwd
+
+ candidate = os.path.join(path_dir, executable)
+
+ if os.path.exists(candidate) and os.access(candidate, os.F_OK | os.X_OK):
+ match = candidate
+ break
+
+ return match
+
+
+def find_globals(g, tree):
+ """Uses AST to find globals in an ast tree"""
+ for child in tree:
+ if hasattr(child, 'body') and isinstance(child.body, list):
+ find_globals(g, child.body)
+ elif isinstance(child, (ast.FunctionDef, ast.ClassDef)):
+ g.add(child.name)
+ continue
+ elif isinstance(child, ast.Assign):
+ try:
+ g.add(child.targets[0].id)
+ except (IndexError, AttributeError):
+ pass
+ elif isinstance(child, ast.Import):
+ g.add(child.names[0].name)
+ elif isinstance(child, ast.ImportFrom):
+ for name in child.names:
+ g_name = name.asname or name.name
+ if g_name == '*':
+ continue
+ g.add(g_name)
+
+
+class CaptureStd():
+ """Context manager to handle capturing stderr and stdout"""
+
+ def __enter__(self):
+ self.sys_stdout = sys.stdout
+ self.sys_stderr = sys.stderr
+ sys.stdout = self.stdout = AnsibleTextIOWrapper(BytesIO(), encoding=self.sys_stdout.encoding)
+ sys.stderr = self.stderr = AnsibleTextIOWrapper(BytesIO(), encoding=self.sys_stderr.encoding)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ sys.stdout = self.sys_stdout
+ sys.stderr = self.sys_stderr
+
+ def get(self):
+ """Return ``(stdout, stderr)``"""
+
+ return self.stdout.buffer.getvalue(), self.stderr.buffer.getvalue()
+
+
+def get_module_name_from_filename(filename, collection):
+ # Calculate the module's name so that relative imports work correctly
+ if collection:
+ # collection is a relative path, example: ansible_collections/my_namespace/my_collection
+ # filename is a relative path, example: plugins/modules/my_module.py
+ path = os.path.join(collection, filename)
+ else:
+ # filename is a relative path, example: lib/ansible/modules/system/ping.py
+ path = os.path.relpath(filename, 'lib')
+
+ name = os.path.splitext(path)[0].replace(os.path.sep, '.')
+
+ return name
+
+
+def parse_yaml(value, lineno, module, name, load_all=False, ansible_loader=False):
+ traces = []
+ errors = []
+ data = None
+
+ if load_all:
+ yaml_load = yaml.load_all
+ else:
+ yaml_load = yaml.load
+
+ if ansible_loader:
+ loader = AnsibleLoader
+ else:
+ loader = SafeLoader
+
+ try:
+ data = yaml_load(value, Loader=loader)
+ if load_all:
+ data = list(data)
+ except yaml.MarkedYAMLError as e:
+ errors.append({
+ 'msg': '%s is not valid YAML' % name,
+ 'line': e.problem_mark.line + lineno,
+ 'column': e.problem_mark.column + 1
+ })
+ traces.append(e)
+ except yaml.reader.ReaderError as e:
+ traces.append(e)
+ # TODO: Better line/column detection
+ errors.append({
+ 'msg': ('%s is not valid YAML. Character '
+ '0x%x at position %d.' % (name, e.character, e.position)),
+ 'line': lineno
+ })
+ except yaml.YAMLError as e:
+ traces.append(e)
+ errors.append({
+ 'msg': '%s is not valid YAML: %s: %s' % (name, type(e), e),
+ 'line': lineno
+ })
+
+ return data, errors, traces
+
+
+def is_empty(value):
+ """Evaluate null like values excluding False"""
+ if value is False:
+ return False
+ return not bool(value)
+
+
+def compare_unordered_lists(a, b):
+ """Safe list comparisons
+
+ Supports:
+ - unordered lists
+ - unhashable elements
+ """
+ return len(a) == len(b) and all(x in b for x in a)
+
+
+class NoArgsAnsibleModule(AnsibleModule):
+ """AnsibleModule that does not actually load params. This is used to get access to the
+ methods within AnsibleModule without having to fake a bunch of data
+ """
+ def _load_params(self):
+ self.params = {'_ansible_selinux_special_fs': [], '_ansible_remote_tmp': '/tmp', '_ansible_keep_remote_files': False, '_ansible_check_mode': False}
+
+
+def parse_isodate(v, allow_date):
+ if allow_date:
+ if isinstance(v, datetime.date):
+ return v
+ msg = 'Expected ISO 8601 date string (YYYY-MM-DD) or YAML date'
+ else:
+ msg = 'Expected ISO 8601 date string (YYYY-MM-DD)'
+ if not isinstance(v, string_types):
+ raise ValueError(msg)
+ # From Python 3.7 in, there is datetime.date.fromisoformat(). For older versions,
+ # we have to do things manually.
+ if not re.match('^[0-9]{4}-[0-9]{2}-[0-9]{2}$', v):
+ raise ValueError(msg)
+ try:
+ return datetime.datetime.strptime(v, '%Y-%m-%d').date()
+ except ValueError:
+ raise ValueError(msg)
diff --git a/test/lib/ansible_test/_util/controller/sanity/yamllint/config/default.yml b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/default.yml
new file mode 100644
index 0000000..45d8b7a
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/default.yml
@@ -0,0 +1,19 @@
+extends: default
+
+rules:
+ braces: {max-spaces-inside: 1, level: error}
+ brackets: {max-spaces-inside: 1, level: error}
+ colons: {max-spaces-after: -1, level: error}
+ commas: {max-spaces-after: -1, level: error}
+ comments: disable
+ comments-indentation: disable
+ document-start: disable
+ empty-lines: {max: 3, level: error}
+ hyphens: {level: error}
+ indentation: disable
+ key-duplicates: enable
+ line-length: disable
+ new-line-at-end-of-file: disable
+ new-lines: {type: unix}
+ trailing-spaces: disable
+ truthy: disable
diff --git a/test/lib/ansible_test/_util/controller/sanity/yamllint/config/modules.yml b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/modules.yml
new file mode 100644
index 0000000..da7e604
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/modules.yml
@@ -0,0 +1,19 @@
+extends: default
+
+rules:
+ braces: disable
+ brackets: disable
+ colons: disable
+ commas: disable
+ comments: disable
+ comments-indentation: disable
+ document-start: disable
+ empty-lines: disable
+ hyphens: disable
+ indentation: disable
+ key-duplicates: enable
+ line-length: disable
+ new-line-at-end-of-file: disable
+ new-lines: {type: unix}
+ trailing-spaces: disable
+ truthy: disable
diff --git a/test/lib/ansible_test/_util/controller/sanity/yamllint/config/plugins.yml b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/plugins.yml
new file mode 100644
index 0000000..6d41813
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/yamllint/config/plugins.yml
@@ -0,0 +1,19 @@
+extends: default
+
+rules:
+ braces: disable
+ brackets: disable
+ colons: disable
+ commas: disable
+ comments: disable
+ comments-indentation: disable
+ document-start: disable
+ empty-lines: disable
+ hyphens: disable
+ indentation: disable
+ key-duplicates: disable
+ line-length: disable
+ new-line-at-end-of-file: disable
+ new-lines: {type: unix}
+ trailing-spaces: disable
+ truthy: disable
diff --git a/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py b/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py
new file mode 100644
index 0000000..d6de611
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py
@@ -0,0 +1,246 @@
+"""Wrapper around yamllint that supports YAML embedded in Ansible modules."""
+from __future__ import annotations
+
+import ast
+import json
+import os
+import re
+import sys
+import typing as t
+
+import yaml
+from yaml.resolver import Resolver
+from yaml.constructor import SafeConstructor
+from yaml.error import MarkedYAMLError
+from yaml.cyaml import CParser
+
+from yamllint import linter
+from yamllint.config import YamlLintConfig
+
+
+def main():
+ """Main program body."""
+ paths = sys.argv[1:] or sys.stdin.read().splitlines()
+
+ checker = YamlChecker()
+ checker.check(paths)
+ checker.report()
+
+
+class TestConstructor(SafeConstructor):
+ """Yaml Safe Constructor that knows about Ansible tags."""
+ def construct_yaml_unsafe(self, node):
+ """Construct an unsafe tag."""
+ try:
+ constructor = getattr(node, 'id', 'object')
+ if constructor is not None:
+ constructor = getattr(self, 'construct_%s' % constructor)
+ except AttributeError:
+ constructor = self.construct_object
+
+ value = constructor(node)
+
+ return value
+
+
+TestConstructor.add_constructor(
+ '!unsafe',
+ TestConstructor.construct_yaml_unsafe)
+
+
+TestConstructor.add_constructor(
+ '!vault',
+ TestConstructor.construct_yaml_str)
+
+
+TestConstructor.add_constructor(
+ '!vault-encrypted',
+ TestConstructor.construct_yaml_str)
+
+
+class TestLoader(CParser, TestConstructor, Resolver):
+ """Custom YAML loader that recognizes custom Ansible tags."""
+ def __init__(self, stream):
+ CParser.__init__(self, stream)
+ TestConstructor.__init__(self)
+ Resolver.__init__(self)
+
+
+class YamlChecker:
+ """Wrapper around yamllint that supports YAML embedded in Ansible modules."""
+ def __init__(self):
+ self.messages = []
+
+ def report(self):
+ """Print yamllint report to stdout."""
+ report = dict(
+ messages=self.messages,
+ )
+
+ print(json.dumps(report, indent=4, sort_keys=True))
+
+ def check(self, paths): # type: (t.List[str]) -> None
+ """Check the specified paths."""
+ config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
+
+ yaml_conf = YamlLintConfig(file=os.path.join(config_path, 'default.yml'))
+ module_conf = YamlLintConfig(file=os.path.join(config_path, 'modules.yml'))
+ plugin_conf = YamlLintConfig(file=os.path.join(config_path, 'plugins.yml'))
+
+ for path in paths:
+ extension = os.path.splitext(path)[1]
+
+ with open(path, encoding='utf-8') as file:
+ contents = file.read()
+
+ if extension in ('.yml', '.yaml'):
+ self.check_yaml(yaml_conf, path, contents)
+ elif extension == '.py':
+ if path.startswith('lib/ansible/modules/') or path.startswith('plugins/modules/'):
+ conf = module_conf
+ else:
+ conf = plugin_conf
+
+ self.check_module(conf, path, contents)
+ else:
+ raise Exception('unsupported extension: %s' % extension)
+
+ def check_yaml(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None
+ """Check the given YAML."""
+ self.check_parsable(path, contents)
+ self.messages += [self.result_to_message(r, path) for r in linter.run(contents, conf, path)]
+
+ def check_module(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None
+ """Check the given module."""
+ docs = self.get_module_docs(path, contents)
+
+ for key, value in docs.items():
+ yaml_data = value['yaml']
+ lineno = value['lineno']
+ fmt = value['fmt']
+
+ if fmt != 'yaml':
+ continue
+
+ if yaml_data.startswith('\n'):
+ yaml_data = yaml_data[1:]
+ lineno += 1
+
+ self.check_parsable(path, yaml_data, lineno)
+
+ messages = list(linter.run(yaml_data, conf, path))
+
+ self.messages += [self.result_to_message(r, path, lineno - 1, key) for r in messages]
+
+ def check_parsable(self, path, contents, lineno=1): # type: (str, str, int) -> None
+ """Check the given contents to verify they can be parsed as YAML."""
+ try:
+ yaml.load(contents, Loader=TestLoader)
+ except MarkedYAMLError as ex:
+ self.messages += [{'code': 'unparsable-with-libyaml',
+ 'message': '%s - %s' % (ex.args[0], ex.args[2]),
+ 'path': path,
+ 'line': ex.problem_mark.line + lineno,
+ 'column': ex.problem_mark.column + 1,
+ 'level': 'error',
+ }]
+
+ @staticmethod
+ def result_to_message(result, path, line_offset=0, prefix=''): # type: (t.Any, str, int, str) -> t.Dict[str, t.Any]
+ """Convert the given result to a dictionary and return it."""
+ if prefix:
+ prefix = '%s: ' % prefix
+
+ return dict(
+ code=result.rule or result.level,
+ message=prefix + result.desc,
+ path=path,
+ line=result.line + line_offset,
+ column=result.column,
+ level=result.level,
+ )
+
+ def get_module_docs(self, path, contents): # type: (str, str) -> t.Dict[str, t.Any]
+ """Return the module documentation for the given module contents."""
+ module_doc_types = [
+ 'DOCUMENTATION',
+ 'EXAMPLES',
+ 'RETURN',
+ ]
+
+ docs = {}
+
+ fmt_re = re.compile(r'^# fmt:\s+(\S+)')
+
+ def check_assignment(statement, doc_types=None):
+ """Check the given statement for a documentation assignment."""
+ for target in statement.targets:
+ if not isinstance(target, ast.Name):
+ continue
+
+ if doc_types and target.id not in doc_types:
+ continue
+
+ fmt_match = fmt_re.match(statement.value.s.lstrip())
+ fmt = 'yaml'
+ if fmt_match:
+ fmt = fmt_match.group(1)
+
+ docs[target.id] = dict(
+ yaml=statement.value.s,
+ lineno=statement.lineno,
+ end_lineno=statement.lineno + len(statement.value.s.splitlines()),
+ fmt=fmt.lower(),
+ )
+
+ module_ast = self.parse_module(path, contents)
+
+ if not module_ast:
+ return {}
+
+ is_plugin = path.startswith('lib/ansible/modules/') or path.startswith('lib/ansible/plugins/') or path.startswith('plugins/')
+ is_doc_fragment = path.startswith('lib/ansible/plugins/doc_fragments/') or path.startswith('plugins/doc_fragments/')
+
+ if is_plugin and not is_doc_fragment:
+ for body_statement in module_ast.body:
+ if isinstance(body_statement, ast.Assign):
+ check_assignment(body_statement, module_doc_types)
+ elif is_doc_fragment:
+ for body_statement in module_ast.body:
+ if isinstance(body_statement, ast.ClassDef):
+ for class_statement in body_statement.body:
+ if isinstance(class_statement, ast.Assign):
+ check_assignment(class_statement)
+ else:
+ raise Exception('unsupported path: %s' % path)
+
+ return docs
+
+ def parse_module(self, path, contents): # type: (str, str) -> t.Optional[ast.Module]
+ """Parse the given contents and return a module if successful, otherwise return None."""
+ try:
+ return ast.parse(contents)
+ except SyntaxError as ex:
+ self.messages.append(dict(
+ code='python-syntax-error',
+ message=str(ex),
+ path=path,
+ line=ex.lineno,
+ column=ex.offset,
+ level='error',
+ ))
+ except Exception as ex: # pylint: disable=broad-except
+ self.messages.append(dict(
+ code='python-parse-error',
+ message=str(ex),
+ path=path,
+ line=0,
+ column=0,
+ level='error',
+ ))
+
+ return None
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/tools/collection_detail.py b/test/lib/ansible_test/_util/controller/tools/collection_detail.py
new file mode 100644
index 0000000..870ea59
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/tools/collection_detail.py
@@ -0,0 +1,94 @@
+"""Retrieve collection detail."""
+from __future__ import annotations
+
+import json
+import os
+import re
+import sys
+
+import yaml
+
+
+# See semantic versioning specification (https://semver.org/)
+NUMERIC_IDENTIFIER = r'(?:0|[1-9][0-9]*)'
+ALPHANUMERIC_IDENTIFIER = r'(?:[0-9]*[a-zA-Z-][a-zA-Z0-9-]*)'
+
+PRE_RELEASE_IDENTIFIER = r'(?:' + NUMERIC_IDENTIFIER + r'|' + ALPHANUMERIC_IDENTIFIER + r')'
+BUILD_IDENTIFIER = r'[a-zA-Z0-9-]+' # equivalent to r'(?:[0-9]+|' + ALPHANUMERIC_IDENTIFIER + r')'
+
+VERSION_CORE = NUMERIC_IDENTIFIER + r'\.' + NUMERIC_IDENTIFIER + r'\.' + NUMERIC_IDENTIFIER
+PRE_RELEASE = r'(?:-' + PRE_RELEASE_IDENTIFIER + r'(?:\.' + PRE_RELEASE_IDENTIFIER + r')*)?'
+BUILD = r'(?:\+' + BUILD_IDENTIFIER + r'(?:\.' + BUILD_IDENTIFIER + r')*)?'
+
+SEMVER_REGULAR_EXPRESSION = r'^' + VERSION_CORE + PRE_RELEASE + BUILD + r'$'
+
+
+def validate_version(version):
+ """Raise exception if the provided version is not None or a valid semantic version."""
+ if version is None:
+ return
+ if not re.match(SEMVER_REGULAR_EXPRESSION, version):
+ raise Exception('Invalid version number "{0}". Collection version numbers must '
+ 'follow semantic versioning (https://semver.org/).'.format(version))
+
+
+def read_manifest_json(collection_path):
+ """Return collection information from the MANIFEST.json file."""
+ manifest_path = os.path.join(collection_path, 'MANIFEST.json')
+
+ if not os.path.exists(manifest_path):
+ return None
+
+ try:
+ with open(manifest_path, encoding='utf-8') as manifest_file:
+ manifest = json.load(manifest_file)
+
+ collection_info = manifest.get('collection_info') or {}
+
+ result = dict(
+ version=collection_info.get('version'),
+ )
+ validate_version(result['version'])
+ except Exception as ex: # pylint: disable=broad-except
+ raise Exception('{0}: {1}'.format(os.path.basename(manifest_path), ex))
+
+ return result
+
+
+def read_galaxy_yml(collection_path):
+ """Return collection information from the galaxy.yml file."""
+ galaxy_path = os.path.join(collection_path, 'galaxy.yml')
+
+ if not os.path.exists(galaxy_path):
+ return None
+
+ try:
+ with open(galaxy_path, encoding='utf-8') as galaxy_file:
+ galaxy = yaml.safe_load(galaxy_file)
+
+ result = dict(
+ version=galaxy.get('version'),
+ )
+ validate_version(result['version'])
+ except Exception as ex: # pylint: disable=broad-except
+ raise Exception('{0}: {1}'.format(os.path.basename(galaxy_path), ex))
+
+ return result
+
+
+def main():
+ """Retrieve collection detail."""
+ collection_path = sys.argv[1]
+
+ try:
+ result = read_manifest_json(collection_path) or read_galaxy_yml(collection_path) or {}
+ except Exception as ex: # pylint: disable=broad-except
+ result = dict(
+ error='{0}'.format(ex),
+ )
+
+ print(json.dumps(result))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/tools/coverage_stub.ps1 b/test/lib/ansible_test/_util/controller/tools/coverage_stub.ps1
new file mode 100644
index 0000000..fcc4570
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/tools/coverage_stub.ps1
@@ -0,0 +1,40 @@
+<#
+.SYNOPSIS
+Gets the lines to hit from a sourcefile for coverage stubs.
+#>
+[CmdletBinding()]
+param (
+ [Parameter(Mandatory, ValueFromRemainingArguments)]
+ [String[]]
+ $Path
+)
+
+$stubInfo = @(
+ foreach ($sourcePath in $Path) {
+ # Default is to just no lines for missing files
+ [Collections.Generic.HashSet[int]]$lines = @()
+
+ if (Test-Path -LiteralPath $sourcePath) {
+ $code = [ScriptBlock]::Create([IO.File]::ReadAllText($sourcePath))
+
+ # We set our breakpoints with this predicate so our stubs should match
+ # that logic.
+ $predicate = {
+ $args[0] -is [System.Management.Automation.Language.CommandBaseAst]
+ }
+ $cmds = $code.Ast.FindAll($predicate, $true)
+
+ # We only care about unique lines not multiple commands on 1 line.
+ $lines = @(foreach ($cmd in $cmds) {
+ $cmd.Extent.StartLineNumber
+ })
+ }
+
+ [PSCustomObject]@{
+ Path = $sourcePath
+ Lines = $lines
+ }
+ }
+)
+
+ConvertTo-Json -InputObject $stubInfo -Depth 2 -Compress
diff --git a/test/lib/ansible_test/_util/controller/tools/sslcheck.py b/test/lib/ansible_test/_util/controller/tools/sslcheck.py
new file mode 100644
index 0000000..c25fed6
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/tools/sslcheck.py
@@ -0,0 +1,22 @@
+"""Show openssl version."""
+from __future__ import annotations
+
+import json
+
+# noinspection PyBroadException
+try:
+ from ssl import OPENSSL_VERSION_INFO
+ VERSION = list(OPENSSL_VERSION_INFO[:3])
+except Exception: # pylint: disable=broad-except
+ VERSION = None
+
+
+def main():
+ """Main program entry point."""
+ print(json.dumps(dict(
+ version=VERSION,
+ )))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/lib/ansible_test/_util/controller/tools/yaml_to_json.py b/test/lib/ansible_test/_util/controller/tools/yaml_to_json.py
new file mode 100644
index 0000000..e2a15bf
--- /dev/null
+++ b/test/lib/ansible_test/_util/controller/tools/yaml_to_json.py
@@ -0,0 +1,27 @@
+"""Read YAML from stdin and write JSON to stdout."""
+from __future__ import annotations
+
+import datetime
+import json
+import sys
+
+from yaml import load
+
+try:
+ from yaml import CSafeLoader as SafeLoader
+except ImportError:
+ from yaml import SafeLoader
+
+# unique ISO date marker matching the one present in importer.py
+ISO_DATE_MARKER = 'isodate:f23983df-f3df-453c-9904-bcd08af468cc:'
+
+
+def default(value):
+ """Custom default serializer which supports datetime.date types."""
+ if isinstance(value, datetime.date):
+ return '%s%s' % (ISO_DATE_MARKER, value.isoformat())
+
+ raise TypeError('cannot serialize type: %s' % type(value))
+
+
+json.dump(load(sys.stdin, Loader=SafeLoader), sys.stdout, default=default)