diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/commands')
54 files changed, 9434 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/commands/__init__.py b/test/lib/ansible_test/_internal/commands/__init__.py new file mode 100644 index 0000000..e9cb681 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/__init__.py @@ -0,0 +1,2 @@ +"""Nearly empty __init__.py to keep pylint happy.""" +from __future__ import annotations diff --git a/test/lib/ansible_test/_internal/commands/coverage/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/__init__.py new file mode 100644 index 0000000..139cf3c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/__init__.py @@ -0,0 +1,370 @@ +"""Common logic for the coverage subcommand.""" +from __future__ import annotations + +import collections.abc as c +import json +import os +import re +import typing as t + +from ...encoding import ( + to_bytes, +) + +from ...io import ( + read_text_file, + read_json_file, +) + +from ...util import ( + ApplicationError, + common_environment, + display, + ANSIBLE_TEST_DATA_ROOT, +) + +from ...util_common import ( + intercept_python, + ResultType, +) + +from ...config import ( + EnvironmentConfig, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...target import ( + walk_module_targets, +) + +from ...data import ( + data_context, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...provisioning import ( + HostState, +) + +from ...coverage_util import ( + get_coverage_file_schema_version, + CoverageError, + CONTROLLER_COVERAGE_VERSION, +) + +if t.TYPE_CHECKING: + import coverage as coverage_module + +COVERAGE_GROUPS = ('command', 'target', 'environment', 'version') +COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc') +COVERAGE_OUTPUT_FILE_NAME = 'coverage' + + +class CoverageConfig(EnvironmentConfig): + """Configuration for the coverage command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args, 'coverage') + + +def initialize_coverage(args: CoverageConfig, host_state: HostState) -> coverage_module: + """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available.""" + configure_pypi_proxy(args, host_state.controller_profile) # coverage + install_requirements(args, host_state.controller_profile.python, coverage=True) # coverage + + try: + import coverage + except ImportError: + coverage = None + + coverage_required_version = CONTROLLER_COVERAGE_VERSION.coverage_version + + if not coverage: + raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module must be installed to use this command.') + + if coverage.__version__ != coverage_required_version: + raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module is required. Version {coverage.__version__} was found.') + + return coverage + + +def run_coverage(args: CoverageConfig, host_state: HostState, output_file: str, command: str, cmd: list[str]) -> None: + """Run the coverage cli tool with the specified options.""" + env = common_environment() + env.update(dict(COVERAGE_FILE=output_file)) + + cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd + + stdout, stderr = intercept_python(args, host_state.controller_profile.python, cmd, env, capture=True) + + stdout = (stdout or '').strip() + stderr = (stderr or '').strip() + + if stdout: + display.info(stdout) + + if stderr: + display.warning(stderr) + + +def get_all_coverage_files() -> list[str]: + """Return a list of all coverage file paths.""" + return get_python_coverage_files() + get_powershell_coverage_files() + + +def get_python_coverage_files(path: t.Optional[str] = None) -> list[str]: + """Return the list of Python coverage file paths.""" + return get_coverage_files('python', path) + + +def get_powershell_coverage_files(path: t.Optional[str] = None) -> list[str]: + """Return the list of PowerShell coverage file paths.""" + return get_coverage_files('powershell', path) + + +def get_coverage_files(language: str, path: t.Optional[str] = None) -> list[str]: + """Return the list of coverage file paths for the given language.""" + coverage_dir = path or ResultType.COVERAGE.path + + try: + coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir) + if '=coverage.' in f and '=%s' % language in f] + except FileNotFoundError: + return [] + + return coverage_files + + +def get_collection_path_regexes() -> tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]: + """Return a pair of regexes used for identifying and manipulating collection paths.""" + if data_context().content.collection: + collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory) + collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory) + else: + collection_search_re = None + collection_sub_re = None + + return collection_search_re, collection_sub_re + + +def get_python_modules() -> dict[str, str]: + """Return a dictionary of Ansible module names and their paths.""" + return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py')) + + +def enumerate_python_arcs( + path: str, + coverage: coverage_module, + modules: dict[str, str], + collection_search_re: t.Optional[t.Pattern], + collection_sub_re: t.Optional[t.Pattern], +) -> c.Generator[tuple[str, set[tuple[int, int]]], None, None]: + """Enumerate Python code coverage arcs in the given file.""" + if os.path.getsize(path) == 0: + display.warning('Empty coverage file: %s' % path, verbosity=2) + return + + try: + arc_data = read_python_coverage(path, coverage) + except CoverageError as ex: + display.error(str(ex)) + return + + for filename, arcs in arc_data.items(): + if not arcs: + # This is most likely due to using an unsupported version of coverage. + display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path)) + continue + + filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) + + if not filename: + continue + + yield filename, set(arcs) + + +PythonArcs = dict[str, list[tuple[int, int]]] +"""Python coverage arcs.""" + + +def read_python_coverage(path: str, coverage: coverage_module) -> PythonArcs: + """Return coverage arcs from the specified coverage file. Raises a CoverageError exception if coverage cannot be read.""" + try: + return read_python_coverage_native(path, coverage) + except CoverageError as ex: + schema_version = get_coverage_file_schema_version(path) + + if schema_version == CONTROLLER_COVERAGE_VERSION.schema_version: + raise CoverageError(path, f'Unexpected failure reading supported schema version {schema_version}.') from ex + + if schema_version == 0: + return read_python_coverage_legacy(path) + + raise CoverageError(path, f'Unsupported schema version: {schema_version}') + + +def read_python_coverage_native(path: str, coverage: coverage_module) -> PythonArcs: + """Return coverage arcs from the specified coverage file using the coverage API.""" + try: + data = coverage.CoverageData(path) + data.read() + arcs = {filename: data.arcs(filename) for filename in data.measured_files()} + except Exception as ex: + raise CoverageError(path, f'Error reading coverage file using coverage API: {ex}') from ex + + return arcs + + +def read_python_coverage_legacy(path: str) -> PythonArcs: + """Return coverage arcs from the specified coverage file, which must be in the legacy JSON format.""" + try: + contents = read_text_file(path) + contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents) + data = json.loads(contents) + arcs: PythonArcs = {filename: [t.cast(tuple[int, int], tuple(arc)) for arc in arc_list] for filename, arc_list in data['arcs'].items()} + except Exception as ex: + raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex + + return arcs + + +def enumerate_powershell_lines( + path: str, + collection_search_re: t.Optional[t.Pattern], + collection_sub_re: t.Optional[t.Pattern], +) -> c.Generator[tuple[str, dict[int, int]], None, None]: + """Enumerate PowerShell code coverage lines in the given file.""" + if os.path.getsize(path) == 0: + display.warning('Empty coverage file: %s' % path, verbosity=2) + return + + try: + coverage_run = read_json_file(path) + except Exception as ex: # pylint: disable=locally-disabled, broad-except + display.error('%s' % ex) + return + + for filename, hits in coverage_run.items(): + filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re) + + if not filename: + continue + + if isinstance(hits, dict) and not hits.get('Line'): + # Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage. + # This format differs from the more verbose format of raw coverage data from the remote Windows hosts. + hits = dict((int(key), value) for key, value in hits.items()) + + yield filename, hits + continue + + # PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that + if not isinstance(hits, list): + hits = [hits] + + hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit) + + yield filename, hits + + +def sanitize_filename( + filename: str, + modules: t.Optional[dict[str, str]] = None, + collection_search_re: t.Optional[t.Pattern] = None, + collection_sub_re: t.Optional[t.Pattern] = None, +) -> t.Optional[str]: + """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid.""" + ansible_path = os.path.abspath('lib/ansible/') + '/' + root_path = data_context().content.root + '/' + integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep + + if modules is None: + modules = {} + + if '/ansible_modlib.zip/ansible/' in filename: + # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier. + new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif collection_search_re and collection_search_re.search(filename): + new_name = os.path.abspath(collection_sub_re.sub('', filename)) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename): + # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later. + new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif '/ansible_module_' in filename: + # Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier. + module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename) + if module_name not in modules: + display.warning('Skipping coverage of unknown module: %s' % module_name) + return None + new_name = os.path.abspath(modules[module_name]) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename): + # Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later. + # AnsiballZ versions using zipimporter will match the `.zip` portion of the regex. + # AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex. + module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$', + '\\g<module>', filename).rstrip('_') + if module_name not in modules: + display.warning('Skipping coverage of unknown module: %s' % module_name) + return None + new_name = os.path.abspath(modules[module_name]) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif re.search('^(/.*?)?/root/ansible/', filename): + # Rewrite the path of code running on a remote host or in a docker container as root. + new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + elif integration_temp_path in filename: + # Rewrite the path of code running from an integration test temporary directory. + new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename) + display.info('%s -> %s' % (filename, new_name), verbosity=3) + filename = new_name + + filename = os.path.abspath(filename) # make sure path is absolute (will be relative if previously exported) + + return filename + + +class PathChecker: + """Checks code coverage paths to verify they are valid and reports on the findings.""" + def __init__(self, args: CoverageConfig, collection_search_re: t.Optional[t.Pattern] = None) -> None: + self.args = args + self.collection_search_re = collection_search_re + self.invalid_paths: list[str] = [] + self.invalid_path_chars = 0 + + def check_path(self, path: str) -> bool: + """Return True if the given coverage path is valid, otherwise display a warning and return False.""" + if os.path.isfile(to_bytes(path)): + return True + + if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py': + # the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk + # coverage is still reported for these non-existent files, but warnings are not needed + return False + + self.invalid_paths.append(path) + self.invalid_path_chars += len(path) + + if self.args.verbosity > 1: + display.warning('Invalid coverage path: %s' % path) + + return False + + def report(self) -> None: + """Display a warning regarding invalid paths if any were found.""" + if self.invalid_paths: + display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths))) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py new file mode 100644 index 0000000..37859e8 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/__init__.py @@ -0,0 +1,17 @@ +"""Common logic for the `coverage analyze` subcommand.""" +from __future__ import annotations +import typing as t + +from .. import ( + CoverageConfig, +) + + +class CoverageAnalyzeConfig(CoverageConfig): + """Configuration for the `coverage analyze` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + # avoid mixing log messages with file output when using `/dev/stdout` for the output file on commands + # this may be worth considering as the default behavior in the future, instead of being dependent on the command or options used + self.display_stderr = True diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py new file mode 100644 index 0000000..ad6cf86 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/__init__.py @@ -0,0 +1,154 @@ +"""Analyze integration test target code coverage.""" +from __future__ import annotations + +import collections.abc as c +import os +import typing as t + +from .....io import ( + read_json_file, + write_json_file, +) + +from .....util import ( + ApplicationError, + display, +) + +from .. import ( + CoverageAnalyzeConfig, +) + +TargetKey = t.TypeVar('TargetKey', int, tuple[int, int]) +TFlexKey = t.TypeVar('TFlexKey', int, tuple[int, int], str) +NamedPoints = dict[str, dict[TargetKey, set[str]]] +IndexedPoints = dict[str, dict[TargetKey, set[int]]] +Arcs = dict[str, dict[tuple[int, int], set[int]]] +Lines = dict[str, dict[int, set[int]]] +TargetIndexes = dict[str, int] +TargetSetIndexes = dict[frozenset[int], int] + + +class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig): + """Configuration for the `coverage analyze targets` command.""" + + +def make_report(target_indexes: TargetIndexes, arcs: Arcs, lines: Lines) -> dict[str, t.Any]: + """Condense target indexes, arcs and lines into a compact report.""" + set_indexes: TargetSetIndexes = {} + arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items()) + line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items()) + + report = dict( + targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])], + target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])], + arcs=arc_refs, + lines=line_refs, + ) + + return report + + +def load_report(report: dict[str, t.Any]) -> tuple[list[str], Arcs, Lines]: + """Extract target indexes, arcs and lines from an existing report.""" + try: + target_indexes: list[str] = report['targets'] + target_sets: list[list[int]] = report['target_sets'] + arc_data: dict[str, dict[str, int]] = report['arcs'] + line_data: dict[str, dict[int, int]] = report['lines'] + except KeyError as ex: + raise ApplicationError('Document is missing key "%s".' % ex.args) + except TypeError: + raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__) + + arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items()) + lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items()) + + return target_indexes, arcs, lines + + +def read_report(path: str) -> tuple[list[str], Arcs, Lines]: + """Read a JSON report from disk.""" + try: + report = read_json_file(path) + except Exception as ex: + raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex)) + + try: + return load_report(report) + except ApplicationError as ex: + raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex)) + + +def write_report(args: CoverageAnalyzeTargetsConfig, report: dict[str, t.Any], path: str) -> None: + """Write a JSON report to disk.""" + if args.explain: + return + + write_json_file(path, report, formatted=False) + + display.info('Generated %d byte report with %d targets covering %d files.' % ( + os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())), + ), verbosity=1) + + +def format_line(value: int) -> str: + """Format line as a string.""" + return str(value) # putting this in a function keeps both pylint and mypy happy + + +def format_arc(value: tuple[int, int]) -> str: + """Format an arc tuple as a string.""" + return '%d:%d' % value + + +def parse_arc(value: str) -> tuple[int, int]: + """Parse an arc string into a tuple.""" + first, last = tuple(map(int, value.split(':'))) + return first, last + + +def get_target_set_index(data: set[int], target_set_indexes: TargetSetIndexes) -> int: + """Find or add the target set in the result set and return the target set index.""" + return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes)) + + +def get_target_index(name: str, target_indexes: TargetIndexes) -> int: + """Find or add the target in the result set and return the target index.""" + return target_indexes.setdefault(name, len(target_indexes)) + + +def expand_indexes( + source_data: IndexedPoints, + source_index: list[str], + format_func: c.Callable[[TargetKey], TFlexKey], +) -> dict[str, dict[TFlexKey, set[str]]]: + """Expand indexes from the source into target names for easier processing of the data (arcs or lines).""" + combined_data: dict[str, dict[TFlexKey, set[str]]] = {} + + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(format_func(covered_point), set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(source_index[covered_target_index]) + + return combined_data + + +def generate_indexes(target_indexes: TargetIndexes, data: NamedPoints) -> IndexedPoints: + """Return an indexed version of the given data (arcs or points).""" + results: IndexedPoints = {} + + for path, points in data.items(): + result_points = results[path] = {} + + for point, target_names in points.items(): + result_point = result_points[point] = set() + + for target_name in target_names: + result_point.add(get_target_index(target_name, target_indexes)) + + return results diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py new file mode 100644 index 0000000..e3782ce --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/combine.py @@ -0,0 +1,74 @@ +"""Combine integration test target code coverage reports.""" +from __future__ import annotations +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + Arcs, + IndexedPoints, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets combine` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_files: list[str] = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombineConfig) -> None: + """Combine integration test target code coverage reports.""" + host_state = prepare_profiles(args) # coverage analyze targets combine + + if args.delegate: + raise Delegate(host_state=host_state) + + combined_target_indexes: TargetIndexes = {} + combined_path_arcs: Arcs = {} + combined_path_lines: Lines = {} + + for report_path in args.input_files: + covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path) + + merge_indexes(covered_path_arcs, covered_targets, combined_path_arcs, combined_target_indexes) + merge_indexes(covered_path_lines, covered_targets, combined_path_lines, combined_target_indexes) + + report = make_report(combined_target_indexes, combined_path_arcs, combined_path_lines) + + write_report(args, report, args.output_file) + + +def merge_indexes( + source_data: IndexedPoints, + source_index: list[str], + combined_data: IndexedPoints, + combined_index: TargetIndexes, +) -> None: + """Merge indexes from the source into the combined data set (arcs or lines).""" + for covered_path, covered_points in source_data.items(): + combined_points = combined_data.setdefault(covered_path, {}) + + for covered_point, covered_target_indexes in covered_points.items(): + combined_point = combined_points.setdefault(covered_point, set()) + + for covered_target_index in covered_target_indexes: + combined_point.add(get_target_index(source_index[covered_target_index], combined_index)) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py new file mode 100644 index 0000000..ba90387 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/expand.py @@ -0,0 +1,51 @@ +"""Expand target names in an aggregated coverage file.""" +from __future__ import annotations +import typing as t + +from .....io import ( + SortedSetEncoder, + write_json_file, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + format_arc, + format_line, + read_report, +) + + +class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets expand` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_expand(args: CoverageAnalyzeTargetsExpandConfig) -> None: + """Expand target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets expand + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + report = dict( + arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc), + lines=expand_indexes(covered_path_lines, covered_targets, format_line), + ) + + if not args.explain: + write_json_file(args.output_file, report, encoder=SortedSetEncoder) diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py new file mode 100644 index 0000000..29a8ee5 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/filter.py @@ -0,0 +1,122 @@ +"""Filter an aggregated coverage file, keeping only the specified targets.""" +from __future__ import annotations + +import collections.abc as c +import re +import typing as t + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + expand_indexes, + generate_indexes, + make_report, + read_report, + write_report, +) + +from . import ( + NamedPoints, + TargetKey, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets filter` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_file: str = args.input_file + self.output_file: str = args.output_file + self.include_targets: list[str] = args.include_targets + self.exclude_targets: list[str] = args.exclude_targets + self.include_path: t.Optional[str] = args.include_path + self.exclude_path: t.Optional[str] = args.exclude_path + + +def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterConfig) -> None: + """Filter target names in an aggregated coverage file.""" + host_state = prepare_profiles(args) # coverage analyze targets filter + + if args.delegate: + raise Delegate(host_state=host_state) + + covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) + + def pass_target_key(value: TargetKey) -> TargetKey: + """Return the given target key unmodified.""" + return value + + filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, pass_target_key) + filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, pass_target_key) + + include_targets = set(args.include_targets) if args.include_targets else None + exclude_targets = set(args.exclude_targets) if args.exclude_targets else None + + include_path = re.compile(args.include_path) if args.include_path else None + exclude_path = re.compile(args.exclude_path) if args.exclude_path else None + + def path_filter_func(path: str) -> bool: + """Return True if the given path should be included, otherwise return False.""" + if include_path and not re.search(include_path, path): + return False + + if exclude_path and re.search(exclude_path, path): + return False + + return True + + def target_filter_func(targets: set[str]) -> set[str]: + """Filter the given targets and return the result based on the defined includes and excludes.""" + if include_targets: + targets &= include_targets + + if exclude_targets: + targets -= exclude_targets + + return targets + + filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func) + filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func) + + target_indexes: TargetIndexes = {} + indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs) + indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines) + + report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines) + + write_report(args, report, args.output_file) + + +def filter_data( + data: NamedPoints, + path_filter_func: c.Callable[[str], bool], + target_filter_func: c.Callable[[set[str]], set[str]], +) -> NamedPoints: + """Filter the data set using the specified filter function.""" + result: NamedPoints = {} + + for src_path, src_points in data.items(): + if not path_filter_func(src_path): + continue + + dst_points = {} + + for src_point, src_targets in src_points.items(): + dst_targets = target_filter_func(src_targets) + + if dst_targets: + dst_points[src_point] = dst_targets + + if dst_points: + result[src_path] = dst_points + + return result diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py new file mode 100644 index 0000000..127b5b7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/generate.py @@ -0,0 +1,158 @@ +"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_text, +) + +from .....data import ( + data_context, +) + +from .....util_common import ( + ResultType, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, + HostState, +) + +from ... import ( + enumerate_powershell_lines, + enumerate_python_arcs, + get_collection_path_regexes, + get_powershell_coverage_files, + get_python_coverage_files, + get_python_modules, + initialize_coverage, + PathChecker, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + write_report, +) + +from . import ( + Arcs, + Lines, + TargetIndexes, +) + + +class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets generate` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.input_dir: str = args.input_dir or ResultType.COVERAGE.path + self.output_file: str = args.output_file + + +def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenerateConfig) -> None: + """Analyze code coverage data to determine which integration test targets provide coverage for each arc or line.""" + host_state = prepare_profiles(args) # coverage analyze targets generate + + if args.delegate: + raise Delegate(host_state) + + root = data_context().content.root + target_indexes: TargetIndexes = {} + arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items()) + lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items()) + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def analyze_python_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + host_state: HostState, + path: str, + target_indexes: TargetIndexes, +) -> Arcs: + """Analyze Python code coverage.""" + results: Arcs = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + modules = get_python_modules() + python_files = get_python_coverage_files(path) + coverage = initialize_coverage(args, host_state) + + for python_file in python_files: + if not is_integration_coverage_file(python_file): + continue + + target_name = get_target_name(python_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re): + arcs = results.setdefault(filename, {}) + + for covered_arc in covered_arcs: + arc = arcs.setdefault(covered_arc, set()) + arc.add(target_index) + + prune_invalid_filenames(args, results, collection_search_re=collection_search_re) + + return results + + +def analyze_powershell_coverage( + args: CoverageAnalyzeTargetsGenerateConfig, + path: str, + target_indexes: TargetIndexes, +) -> Lines: + """Analyze PowerShell code coverage""" + results: Lines = {} + collection_search_re, collection_sub_re = get_collection_path_regexes() + powershell_files = get_powershell_coverage_files(path) + + for powershell_file in powershell_files: + if not is_integration_coverage_file(powershell_file): + continue + + target_name = get_target_name(powershell_file) + target_index = get_target_index(target_name, target_indexes) + + for filename, hits in enumerate_powershell_lines(powershell_file, collection_search_re, collection_sub_re): + lines = results.setdefault(filename, {}) + + for covered_line in hits: + line = lines.setdefault(covered_line, set()) + line.add(target_index) + + prune_invalid_filenames(args, results) + + return results + + +def prune_invalid_filenames( + args: CoverageAnalyzeTargetsGenerateConfig, + results: dict[str, t.Any], + collection_search_re: t.Optional[t.Pattern] = None, +) -> None: + """Remove invalid filenames from the given result set.""" + path_checker = PathChecker(args, collection_search_re) + + for path in list(results.keys()): + if not path_checker.check_path(path): + del results[path] + + +def get_target_name(path: str) -> str: + """Extract the test target name from the given coverage path.""" + return to_text(os.path.basename(path).split('=')[1]) + + +def is_integration_coverage_file(path: str) -> bool: + """Returns True if the coverage file came from integration tests, otherwise False.""" + return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration') diff --git a/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py new file mode 100644 index 0000000..c1c77e7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/analyze/targets/missing.py @@ -0,0 +1,119 @@ +"""Identify aggregated coverage in one file missing from another.""" +from __future__ import annotations + +import os +import typing as t + +from .....encoding import ( + to_bytes, +) + +from .....executor import ( + Delegate, +) + +from .....provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageAnalyzeTargetsConfig, + get_target_index, + make_report, + read_report, + write_report, +) + +from . import ( + TargetIndexes, + IndexedPoints, +) + + +class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig): + """Configuration for the `coverage analyze targets missing` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.from_file: str = args.from_file + self.to_file: str = args.to_file + self.output_file: str = args.output_file + + self.only_gaps: bool = args.only_gaps + self.only_exists: bool = args.only_exists + + +def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissingConfig) -> None: + """Identify aggregated coverage in one file missing from another.""" + host_state = prepare_profiles(args) # coverage analyze targets missing + + if args.delegate: + raise Delegate(host_state=host_state) + + from_targets, from_path_arcs, from_path_lines = read_report(args.from_file) + to_targets, to_path_arcs, to_path_lines = read_report(args.to_file) + target_indexes: TargetIndexes = {} + + if args.only_gaps: + arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists) + lines = find_gaps(from_path_lines, from_targets, to_path_lines, target_indexes, args.only_exists) + else: + arcs = find_missing(from_path_arcs, from_targets, to_path_arcs, to_targets, target_indexes, args.only_exists) + lines = find_missing(from_path_lines, from_targets, to_path_lines, to_targets, target_indexes, args.only_exists) + + report = make_report(target_indexes, arcs, lines) + write_report(args, report, args.output_file) + + +def find_gaps( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find gaps in coverage between the from and to data sets.""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + gaps = set(from_points.keys()) - set(to_points.keys()) + + if gaps: + gap_points = dict((key, value) for key, value in from_points.items() if key in gaps) + target_data[from_path] = dict((gap, set(get_target_index(from_index[i], target_indexes) for i in indexes)) for gap, indexes in gap_points.items()) + + return target_data + + +def find_missing( + from_data: IndexedPoints, + from_index: list[str], + to_data: IndexedPoints, + to_index: list[str], + target_indexes: TargetIndexes, + only_exists: bool, +) -> IndexedPoints: + """Find coverage in from_data not present in to_data (arcs or lines).""" + target_data: IndexedPoints = {} + + for from_path, from_points in from_data.items(): + if only_exists and not os.path.isfile(to_bytes(from_path)): + continue + + to_points = to_data.get(from_path, {}) + + for from_point, from_target_indexes in from_points.items(): + to_target_indexes = to_points.get(from_point, set()) + + remaining_targets = set(from_index[i] for i in from_target_indexes) - set(to_index[i] for i in to_target_indexes) + + if remaining_targets: + target_index = target_data.setdefault(from_path, {}).setdefault(from_point, set()) + target_index.update(get_target_index(name, target_indexes) for name in remaining_targets) + + return target_data diff --git a/test/lib/ansible_test/_internal/commands/coverage/combine.py b/test/lib/ansible_test/_internal/commands/coverage/combine.py new file mode 100644 index 0000000..66210c7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/combine.py @@ -0,0 +1,362 @@ +"""Combine code coverage files.""" +from __future__ import annotations + +import collections.abc as c +import os +import json +import typing as t + +from ...target import ( + walk_compile_targets, + walk_powershell_targets, +) + +from ...io import ( + read_text_file, +) + +from ...util import ( + ANSIBLE_TEST_TOOLS_ROOT, + display, + ApplicationError, + raw_command, +) + +from ...util_common import ( + ResultType, + write_json_file, + write_json_test_results, +) + +from ...executor import ( + Delegate, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + DockerConfig, + RemoteConfig, +) + +from ...provisioning import ( + HostState, + prepare_profiles, +) + +from . import ( + enumerate_python_arcs, + enumerate_powershell_lines, + get_collection_path_regexes, + get_all_coverage_files, + get_python_coverage_files, + get_python_modules, + get_powershell_coverage_files, + initialize_coverage, + COVERAGE_OUTPUT_FILE_NAME, + COVERAGE_GROUPS, + CoverageConfig, + PathChecker, +) + +TValue = t.TypeVar('TValue') + + +def command_coverage_combine(args: CoverageCombineConfig) -> None: + """Patch paths in coverage files and merge into a single file.""" + host_state = prepare_profiles(args) # coverage combine + combine_coverage_files(args, host_state) + + +def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -> list[str]: + """Combine coverage and return a list of the resulting files.""" + if args.delegate: + if isinstance(args.controller, (DockerConfig, RemoteConfig)): + paths = get_all_coverage_files() + exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']] + + if not exported_paths: + raise ExportedCoverageDataNotFound() + + pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths] + + def coverage_callback(files: list[tuple[str, str]]) -> None: + """Add the coverage files to the payload file list.""" + display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1) + files.extend(pairs) + + data_context().register_payload_callback(coverage_callback) + + raise Delegate(host_state=host_state) + + paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state) + + for path in paths: + display.info('Generated combined output: %s' % path, verbosity=1) + + return paths + + +class ExportedCoverageDataNotFound(ApplicationError): + """Exception when no combined coverage data is present yet is required.""" + def __init__(self) -> None: + super().__init__( + 'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n' + 'Export coverage with `ansible-test coverage combine` using the `--export` option.\n' + 'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path) + + +def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]: + """Combine Python coverage files and return a list of the output files.""" + coverage = initialize_coverage(args, host_state) + + modules = get_python_modules() + + coverage_files = get_python_coverage_files() + + def _default_stub_value(source_paths: list[str]) -> dict[str, set[tuple[int, int]]]: + return {path: set() for path in source_paths} + + counter = 0 + sources = _get_coverage_targets(args, walk_compile_targets) + groups = _build_stub_groups(args, sources, _default_stub_value) + + collection_search_re, collection_sub_re = get_collection_path_regexes() + + for coverage_file in coverage_files: + counter += 1 + display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2) + + group = get_coverage_group(args, coverage_file) + + if group is None: + display.warning('Unexpected name for coverage file: %s' % coverage_file) + continue + + for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re): + if args.export: + filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems + + if group not in groups: + groups[group] = {} + + arc_data = groups[group] + + if filename not in arc_data: + arc_data[filename] = set() + + arc_data[filename].update(arcs) + + output_files = [] + + if args.export: + coverage_file = os.path.join(args.export, '') + suffix = '=coverage.combined' + else: + coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME) + suffix = '' + + path_checker = PathChecker(args, collection_search_re) + + for group in sorted(groups): + arc_data = groups[group] + output_file = coverage_file + group + suffix + + if args.explain: + continue + + updated = coverage.CoverageData(output_file) + + for filename in arc_data: + if not path_checker.check_path(filename): + continue + + updated.add_arcs({filename: list(arc_data[filename])}) + + if args.all: + updated.add_arcs(dict((source[0], []) for source in sources)) + + updated.write() # always write files to make sure stale files do not exist + + if updated: + # only report files which are non-empty to prevent coverage from reporting errors + output_files.append(output_file) + + path_checker.report() + + return sorted(output_files) + + +def _command_coverage_combine_powershell(args: CoverageCombineConfig) -> list[str]: + """Combine PowerShell coverage files and return a list of the output files.""" + coverage_files = get_powershell_coverage_files() + + def _default_stub_value(source_paths: list[str]) -> dict[str, dict[int, int]]: + cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')] + cmd.extend(source_paths) + + stubs = json.loads(raw_command(cmd, capture=True)[0]) + + return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs) + + counter = 0 + sources = _get_coverage_targets(args, walk_powershell_targets) + groups = _build_stub_groups(args, sources, _default_stub_value) + + collection_search_re, collection_sub_re = get_collection_path_regexes() + + for coverage_file in coverage_files: + counter += 1 + display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2) + + group = get_coverage_group(args, coverage_file) + + if group is None: + display.warning('Unexpected name for coverage file: %s' % coverage_file) + continue + + for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re): + if args.export: + filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems + + if group not in groups: + groups[group] = {} + + coverage_data = groups[group] + + if filename not in coverage_data: + coverage_data[filename] = {} + + file_coverage = coverage_data[filename] + + for line_no, hit_count in hits.items(): + file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count + + output_files = [] + + path_checker = PathChecker(args) + + for group in sorted(groups): + coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename)) + + if args.all: + missing_sources = [source for source, _source_line_count in sources if source not in coverage_data] + coverage_data.update(_default_stub_value(missing_sources)) + + if not args.explain: + if args.export: + output_file = os.path.join(args.export, group + '=coverage.combined') + write_json_file(output_file, coverage_data, formatted=False) + output_files.append(output_file) + continue + + output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell' + + write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False) + + output_files.append(os.path.join(ResultType.COVERAGE.path, output_file)) + + path_checker.report() + + return sorted(output_files) + + +def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) -> list[tuple[str, int]]: + """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files.""" + sources = [] + + if args.all or args.stub: + # excludes symlinks of regular files to avoid reporting on the same file multiple times + # in the future it would be nice to merge any coverage for symlinks into the real files + for target in walk_func(include_symlinks=False): + target_path = os.path.abspath(target.path) + + target_lines = len(read_text_file(target_path).splitlines()) + + sources.append((target_path, target_lines)) + + sources.sort() + + return sources + + +def _build_stub_groups( + args: CoverageCombineConfig, + sources: list[tuple[str, int]], + default_stub_value: c.Callable[[list[str]], dict[str, TValue]], +) -> dict[str, dict[str, TValue]]: + """ + Split the given list of sources with line counts into groups, maintaining a maximum line count for each group. + Each group consists of a dictionary of sources and default coverage stubs generated by the provided default_stub_value function. + """ + groups = {} + + if args.stub: + stub_group: list[str] = [] + stub_groups = [stub_group] + stub_line_limit = 500000 + stub_line_count = 0 + + for source, source_line_count in sources: + stub_group.append(source) + stub_line_count += source_line_count + + if stub_line_count > stub_line_limit: + stub_line_count = 0 + stub_group = [] + stub_groups.append(stub_group) + + for stub_index, stub_group in enumerate(stub_groups): + if not stub_group: + continue + + groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group) + + return groups + + +def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Optional[str]: + """Return the name of the coverage group for the specified coverage file, or None if no group was found.""" + parts = os.path.basename(coverage_file).split('=', 4) + + if len(parts) != 5 or not parts[4].startswith('coverage.'): + return None + + names = dict( + command=parts[0], + target=parts[1], + environment=parts[2], + version=parts[3], + ) + + export_names = dict( + version=parts[3], + ) + + group = '' + + for part in COVERAGE_GROUPS: + if part in args.group_by: + group += '=%s' % names[part] + elif args.export: + group += '=%s' % export_names.get(part, 'various') + + if args.export: + group = group.lstrip('=') + + return group + + +class CoverageCombineConfig(CoverageConfig): + """Configuration for the coverage combine command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.group_by: frozenset[str] = frozenset(args.group_by) if args.group_by else frozenset() + self.all: bool = args.all + self.stub: bool = args.stub + + # only available to coverage combine + self.export: str = args.export if 'export' in args else False diff --git a/test/lib/ansible_test/_internal/commands/coverage/erase.py b/test/lib/ansible_test/_internal/commands/coverage/erase.py new file mode 100644 index 0000000..70b685c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/erase.py @@ -0,0 +1,43 @@ +"""Erase code coverage files.""" +from __future__ import annotations + +import os + +from ...util_common import ( + ResultType, +) + +from ...executor import ( + Delegate, +) + +from ...provisioning import ( + prepare_profiles, +) + +from . import ( + CoverageConfig, +) + + +def command_coverage_erase(args: CoverageEraseConfig) -> None: + """Erase code coverage data files collected during test runs.""" + host_state = prepare_profiles(args) # coverage erase + + if args.delegate: + raise Delegate(host_state=host_state) + + coverage_dir = ResultType.COVERAGE.path + + for name in os.listdir(coverage_dir): + if not name.startswith('coverage') and '=coverage.' not in name: + continue + + path = os.path.join(coverage_dir, name) + + if not args.explain: + os.remove(path) + + +class CoverageEraseConfig(CoverageConfig): + """Configuration for the coverage erase command.""" diff --git a/test/lib/ansible_test/_internal/commands/coverage/html.py b/test/lib/ansible_test/_internal/commands/coverage/html.py new file mode 100644 index 0000000..e3063c0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/html.py @@ -0,0 +1,51 @@ +"""Generate HTML code coverage reports.""" +from __future__ import annotations + +import os + +from ...io import ( + make_dirs, +) + +from ...util import ( + display, +) + +from ...util_common import ( + ResultType, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_html(args: CoverageHtmlConfig) -> None: + """Generate an HTML coverage report.""" + host_state = prepare_profiles(args) # coverage html + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + if output_file.endswith('-powershell'): + # coverage.py does not support non-Python files so we just skip the local html report. + display.info("Skipping output file %s in html generation" % output_file, verbosity=3) + continue + + dir_name = os.path.join(ResultType.REPORTS.path, os.path.basename(output_file)) + make_dirs(dir_name) + run_coverage(args, host_state, output_file, 'html', ['-i', '-d', dir_name]) + + display.info('HTML report generated: file:///%s' % os.path.join(dir_name, 'index.html')) + + +class CoverageHtmlConfig(CoverageCombineConfig): + """Configuration for the coverage html command.""" diff --git a/test/lib/ansible_test/_internal/commands/coverage/report.py b/test/lib/ansible_test/_internal/commands/coverage/report.py new file mode 100644 index 0000000..fadc13f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/report.py @@ -0,0 +1,152 @@ +"""Generate console code coverage reports.""" +from __future__ import annotations + +import os +import typing as t + +from ...io import ( + read_json_file, +) + +from ...util import ( + display, +) + +from ...data import ( + data_context, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_report(args: CoverageReportConfig) -> None: + """Generate a console coverage report.""" + host_state = prepare_profiles(args) # coverage report + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + if args.group_by or args.stub: + display.info('>>> Coverage Group: %s' % ' '.join(os.path.basename(output_file).split('=')[1:])) + + if output_file.endswith('-powershell'): + display.info(_generate_powershell_output_report(args, output_file)) + else: + options = [] + + if args.show_missing: + options.append('--show-missing') + + if args.include: + options.extend(['--include', args.include]) + + if args.omit: + options.extend(['--omit', args.omit]) + + run_coverage(args, host_state, output_file, 'report', options) + + +def _generate_powershell_output_report(args: CoverageReportConfig, coverage_file: str) -> str: + """Generate and return a PowerShell coverage report for the given coverage file.""" + coverage_info = read_json_file(coverage_file) + + root_path = data_context().content.root + '/' + + name_padding = 7 + cover_padding = 8 + + file_report = [] + total_stmts = 0 + total_miss = 0 + + for filename in sorted(coverage_info.keys()): + hit_info = coverage_info[filename] + + if filename.startswith(root_path): + filename = filename[len(root_path):] + + if args.omit and filename in args.omit: + continue + if args.include and filename not in args.include: + continue + + stmts = len(hit_info) + miss = len([hit for hit in hit_info.values() if hit == 0]) + + name_padding = max(name_padding, len(filename) + 3) + + total_stmts += stmts + total_miss += miss + + cover = "{0}%".format(int((stmts - miss) / stmts * 100)) + + missing = [] + current_missing = None + sorted_lines = sorted([int(x) for x in hit_info.keys()]) + for idx, line in enumerate(sorted_lines): + hit = hit_info[str(line)] + if hit == 0 and current_missing is None: + current_missing = line + elif hit != 0 and current_missing is not None: + end_line = sorted_lines[idx - 1] + if current_missing == end_line: + missing.append(str(current_missing)) + else: + missing.append('%s-%s' % (current_missing, end_line)) + current_missing = None + + if current_missing is not None: + end_line = sorted_lines[-1] + if current_missing == end_line: + missing.append(str(current_missing)) + else: + missing.append('%s-%s' % (current_missing, end_line)) + + file_report.append({'name': filename, 'stmts': stmts, 'miss': miss, 'cover': cover, 'missing': missing}) + + if total_stmts == 0: + return '' + + total_percent = '{0}%'.format(int((total_stmts - total_miss) / total_stmts * 100)) + stmts_padding = max(8, len(str(total_stmts))) + miss_padding = max(7, len(str(total_miss))) + + line_length = name_padding + stmts_padding + miss_padding + cover_padding + + header = 'Name'.ljust(name_padding) + 'Stmts'.rjust(stmts_padding) + 'Miss'.rjust(miss_padding) + \ + 'Cover'.rjust(cover_padding) + + if args.show_missing: + header += 'Lines Missing'.rjust(16) + line_length += 16 + + line_break = '-' * line_length + lines = ['%s%s%s%s%s' % (f['name'].ljust(name_padding), str(f['stmts']).rjust(stmts_padding), + str(f['miss']).rjust(miss_padding), f['cover'].rjust(cover_padding), + ' ' + ', '.join(f['missing']) if args.show_missing else '') + for f in file_report] + totals = 'TOTAL'.ljust(name_padding) + str(total_stmts).rjust(stmts_padding) + \ + str(total_miss).rjust(miss_padding) + total_percent.rjust(cover_padding) + + report = '{0}\n{1}\n{2}\n{1}\n{3}'.format(header, line_break, "\n".join(lines), totals) + return report + + +class CoverageReportConfig(CoverageCombineConfig): + """Configuration for the coverage report command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args) + + self.show_missing: bool = args.show_missing + self.include: str = args.include + self.omit: str = args.omit diff --git a/test/lib/ansible_test/_internal/commands/coverage/xml.py b/test/lib/ansible_test/_internal/commands/coverage/xml.py new file mode 100644 index 0000000..243c9a9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/coverage/xml.py @@ -0,0 +1,189 @@ +"""Generate XML code coverage reports.""" +from __future__ import annotations + +import os +import time + +from xml.etree.ElementTree import ( + Comment, + Element, + SubElement, + tostring, +) + +from xml.dom import ( + minidom, +) + +from ...io import ( + make_dirs, + read_json_file, +) + +from ...util_common import ( + ResultType, + write_text_test_results, +) + +from ...util import ( + get_ansible_version, +) + +from ...data import ( + data_context, +) + +from ...provisioning import ( + prepare_profiles, +) + +from .combine import ( + combine_coverage_files, + CoverageCombineConfig, +) + +from . import ( + run_coverage, +) + + +def command_coverage_xml(args: CoverageXmlConfig) -> None: + """Generate an XML coverage report.""" + host_state = prepare_profiles(args) # coverage xml + output_files = combine_coverage_files(args, host_state) + + for output_file in output_files: + xml_name = '%s.xml' % os.path.basename(output_file) + if output_file.endswith('-powershell'): + report = _generate_powershell_xml(output_file) + + rough_string = tostring(report, 'utf-8') + reparsed = minidom.parseString(rough_string) + pretty = reparsed.toprettyxml(indent=' ') + + write_text_test_results(ResultType.REPORTS, xml_name, pretty) + else: + xml_path = os.path.join(ResultType.REPORTS.path, xml_name) + make_dirs(ResultType.REPORTS.path) + run_coverage(args, host_state, output_file, 'xml', ['-i', '-o', xml_path]) + + +def _generate_powershell_xml(coverage_file: str) -> Element: + """Generate a PowerShell coverage report XML element from the specified coverage file and return it.""" + coverage_info = read_json_file(coverage_file) + + content_root = data_context().content.root + is_ansible = data_context().content.is_ansible + + packages: dict[str, dict[str, dict[str, int]]] = {} + for path, results in coverage_info.items(): + filename = os.path.splitext(os.path.basename(path))[0] + + if filename.startswith('Ansible.ModuleUtils'): + package = 'ansible.module_utils' + elif is_ansible: + package = 'ansible.modules' + else: + rel_path = path[len(content_root) + 1:] + plugin_type = "modules" if rel_path.startswith("plugins/modules") else "module_utils" + package = 'ansible_collections.%splugins.%s' % (data_context().content.collection.prefix, plugin_type) + + if package not in packages: + packages[package] = {} + + packages[package][path] = results + + elem_coverage = Element('coverage') + elem_coverage.append( + Comment(' Generated by ansible-test from the Ansible project: https://www.ansible.com/ ')) + elem_coverage.append( + Comment(' Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd ')) + + elem_sources = SubElement(elem_coverage, 'sources') + + elem_source = SubElement(elem_sources, 'source') + elem_source.text = data_context().content.root + + elem_packages = SubElement(elem_coverage, 'packages') + + total_lines_hit = 0 + total_line_count = 0 + + for package_name, package_data in packages.items(): + lines_hit, line_count = _add_cobertura_package(elem_packages, package_name, package_data) + + total_lines_hit += lines_hit + total_line_count += line_count + + elem_coverage.attrib.update({ + 'branch-rate': '0', + 'branches-covered': '0', + 'branches-valid': '0', + 'complexity': '0', + 'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0", + 'lines-covered': str(total_line_count), + 'lines-valid': str(total_lines_hit), + 'timestamp': str(int(time.time())), + 'version': get_ansible_version(), + }) + + return elem_coverage + + +def _add_cobertura_package(packages: Element, package_name: str, package_data: dict[str, dict[str, int]]) -> tuple[int, int]: + """Add a package element to the given packages element.""" + elem_package = SubElement(packages, 'package') + elem_classes = SubElement(elem_package, 'classes') + + total_lines_hit = 0 + total_line_count = 0 + + for path, results in package_data.items(): + lines_hit = len([True for hits in results.values() if hits]) + line_count = len(results) + + total_lines_hit += lines_hit + total_line_count += line_count + + elem_class = SubElement(elem_classes, 'class') + + class_name = os.path.splitext(os.path.basename(path))[0] + if class_name.startswith("Ansible.ModuleUtils"): + class_name = class_name[20:] + + content_root = data_context().content.root + filename = path + if filename.startswith(content_root): + filename = filename[len(content_root) + 1:] + + elem_class.attrib.update({ + 'branch-rate': '0', + 'complexity': '0', + 'filename': filename, + 'line-rate': str(round(lines_hit / line_count, 4)) if line_count else "0", + 'name': class_name, + }) + + SubElement(elem_class, 'methods') + + elem_lines = SubElement(elem_class, 'lines') + + for number, hits in results.items(): + elem_line = SubElement(elem_lines, 'line') + elem_line.attrib.update( + hits=str(hits), + number=str(number), + ) + + elem_package.attrib.update({ + 'branch-rate': '0', + 'complexity': '0', + 'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0", + 'name': package_name, + }) + + return total_lines_hit, total_line_count + + +class CoverageXmlConfig(CoverageCombineConfig): + """Configuration for the coverage xml command.""" diff --git a/test/lib/ansible_test/_internal/commands/env/__init__.py b/test/lib/ansible_test/_internal/commands/env/__init__.py new file mode 100644 index 0000000..44f229f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/env/__init__.py @@ -0,0 +1,197 @@ +"""Show information about the test environment.""" +from __future__ import annotations + +import datetime +import os +import platform +import sys +import typing as t + +from ...config import ( + CommonConfig, +) + +from ...io import ( + write_json_file, +) + +from ...util import ( + display, + get_ansible_version, + get_available_python_versions, + ApplicationError, +) + +from ...util_common import ( + data_context, + write_json_test_results, + ResultType, +) + +from ...docker_util import ( + get_docker_command, + get_docker_info, + get_docker_container_id, +) + +from ...constants import ( + TIMEOUT_PATH, +) + +from ...ci import ( + get_ci_provider, +) + + +class EnvConfig(CommonConfig): + """Configuration for the `env` command.""" + def __init__(self, args: t.Any) -> None: + super().__init__(args, 'env') + + self.show = args.show + self.dump = args.dump + self.timeout = args.timeout + self.list_files = args.list_files + + if not self.show and not self.dump and self.timeout is None and not self.list_files: + # default to --show if no options were given + self.show = True + + +def command_env(args: EnvConfig) -> None: + """Entry point for the `env` command.""" + show_dump_env(args) + list_files_env(args) + set_timeout(args) + + +def show_dump_env(args: EnvConfig) -> None: + """Show information about the current environment and/or write the information to disk.""" + if not args.show and not args.dump: + return + + container_id = get_docker_container_id() + + data = dict( + ansible=dict( + version=get_ansible_version(), + ), + docker=get_docker_details(args), + container_id=container_id, + environ=os.environ.copy(), + location=dict( + pwd=os.environ.get('PWD', None), + cwd=os.getcwd(), + ), + git=get_ci_provider().get_git_details(args), + platform=dict( + datetime=datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), + platform=platform.platform(), + uname=platform.uname(), + ), + python=dict( + executable=sys.executable, + version=platform.python_version(), + ), + interpreters=get_available_python_versions(), + ) + + if args.show: + verbose = { + 'docker': 3, + 'docker.executable': 0, + 'environ': 2, + 'platform.uname': 1, + } + + show_dict(data, verbose) + + if args.dump and not args.explain: + write_json_test_results(ResultType.BOT, 'data-environment.json', data) + + +def list_files_env(args: EnvConfig) -> None: + """List files on stdout.""" + if not args.list_files: + return + + for path in data_context().content.all_files(): + display.info(path) + + +def set_timeout(args: EnvConfig) -> None: + """Set an execution timeout for subsequent ansible-test invocations.""" + if args.timeout is None: + return + + if args.timeout: + deadline = (datetime.datetime.utcnow() + datetime.timedelta(minutes=args.timeout)).strftime('%Y-%m-%dT%H:%M:%SZ') + + display.info('Setting a %d minute test timeout which will end at: %s' % (args.timeout, deadline), verbosity=1) + else: + deadline = None + + display.info('Clearing existing test timeout.', verbosity=1) + + if args.explain: + return + + if deadline: + data = dict( + duration=args.timeout, + deadline=deadline, + ) + + write_json_file(TIMEOUT_PATH, data) + elif os.path.exists(TIMEOUT_PATH): + os.remove(TIMEOUT_PATH) + + +def show_dict(data: dict[str, t.Any], verbose: dict[str, int], root_verbosity: int = 0, path: t.Optional[list[str]] = None) -> None: + """Show a dict with varying levels of verbosity.""" + path = path if path else [] + + for key, value in sorted(data.items()): + indent = ' ' * len(path) + key_path = path + [key] + key_name = '.'.join(key_path) + verbosity = verbose.get(key_name, root_verbosity) + + if isinstance(value, (tuple, list)): + display.info(indent + '%s:' % key, verbosity=verbosity) + for item in value: + display.info(indent + ' - %s' % item, verbosity=verbosity) + elif isinstance(value, dict): + min_verbosity = min([verbosity] + [v for k, v in verbose.items() if k.startswith('%s.' % key)]) + display.info(indent + '%s:' % key, verbosity=min_verbosity) + show_dict(value, verbose, verbosity, key_path) + else: + display.info(indent + '%s: %s' % (key, value), verbosity=verbosity) + + +def get_docker_details(args: EnvConfig) -> dict[str, t.Any]: + """Return details about docker.""" + docker = get_docker_command() + + executable = None + info = None + version = None + + if docker: + executable = docker.executable + + try: + docker_info = get_docker_info(args) + except ApplicationError as ex: + display.warning(str(ex)) + else: + info = docker_info.info + version = docker_info.version + + docker_details = dict( + executable=executable, + info=info, + version=version, + ) + + return docker_details diff --git a/test/lib/ansible_test/_internal/commands/integration/__init__.py b/test/lib/ansible_test/_internal/commands/integration/__init__.py new file mode 100644 index 0000000..8864d2e --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/__init__.py @@ -0,0 +1,967 @@ +"""Ansible integration test infrastructure.""" +from __future__ import annotations + +import collections.abc as c +import contextlib +import datetime +import json +import os +import re +import shutil +import tempfile +import time +import typing as t + +from ...encoding import ( + to_bytes, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...executor import ( + get_changes_filter, + AllTargetsSkipped, + Delegate, + ListTargets, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...ci import ( + get_ci_provider, +) + +from ...target import ( + analyze_integration_target_dependencies, + walk_integration_targets, + IntegrationTarget, + walk_internal_targets, + TIntegrationTarget, + IntegrationTargetType, +) + +from ...config import ( + IntegrationConfig, + NetworkIntegrationConfig, + PosixIntegrationConfig, + WindowsIntegrationConfig, + TIntegrationConfig, +) + +from ...io import ( + make_dirs, + read_text_file, +) + +from ...util import ( + ApplicationError, + display, + SubprocessError, + remove_tree, +) + +from ...util_common import ( + named_temporary_file, + ResultType, + run_command, + write_json_test_results, + check_pyyaml, +) + +from ...coverage_util import ( + cover_python, +) + +from ...cache import ( + CommonCache, +) + +from .cloud import ( + CloudEnvironmentConfig, + cloud_filter, + cloud_init, + get_cloud_environment, + get_cloud_platforms, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + InventoryConfig, + OriginConfig, +) + +from ...host_profiles import ( + ControllerProfile, + ControllerHostProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...inventory import ( + create_controller_inventory, + create_windows_inventory, + create_network_inventory, + create_posix_inventory, +) + +from .filters import ( + get_target_filter, +) + +from .coverage import ( + CoverageManager, +) + +THostProfile = t.TypeVar('THostProfile', bound=HostProfile) + + +def generate_dependency_map(integration_targets: list[IntegrationTarget]) -> dict[str, set[IntegrationTarget]]: + """Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend.""" + targets_dict = dict((target.name, target) for target in integration_targets) + target_dependencies = analyze_integration_target_dependencies(integration_targets) + dependency_map: dict[str, set[IntegrationTarget]] = {} + + invalid_targets = set() + + for dependency, dependents in target_dependencies.items(): + dependency_target = targets_dict.get(dependency) + + if not dependency_target: + invalid_targets.add(dependency) + continue + + for dependent in dependents: + if dependent not in dependency_map: + dependency_map[dependent] = set() + + dependency_map[dependent].add(dependency_target) + + if invalid_targets: + raise ApplicationError('Non-existent target dependencies: %s' % ', '.join(sorted(invalid_targets))) + + return dependency_map + + +def get_files_needed(target_dependencies: list[IntegrationTarget]) -> list[str]: + """Return a list of files needed by the given list of target dependencies.""" + files_needed: list[str] = [] + + for target_dependency in target_dependencies: + files_needed += target_dependency.needs_file + + files_needed = sorted(set(files_needed)) + + invalid_paths = [path for path in files_needed if not os.path.isfile(path)] + + if invalid_paths: + raise ApplicationError('Invalid "needs/file/*" aliases:\n%s' % '\n'.join(invalid_paths)) + + return files_needed + + +def check_inventory(args: IntegrationConfig, inventory_path: str) -> None: + """Check the given inventory for issues.""" + if not isinstance(args.controller, OriginConfig): + if os.path.exists(inventory_path): + inventory = read_text_file(inventory_path) + + if 'ansible_ssh_private_key_file' in inventory: + display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.') + + +def get_inventory_absolute_path(args: IntegrationConfig, target: InventoryConfig) -> str: + """Return the absolute inventory path used for the given integration configuration or target inventory config (if provided).""" + path = target.path or os.path.basename(get_inventory_relative_path(args)) + + if args.host_path: + path = os.path.join(data_context().content.root, path) # post-delegation, path is relative to the content root + else: + path = os.path.join(data_context().content.root, data_context().content.integration_path, path) + + return path + + +def get_inventory_relative_path(args: IntegrationConfig) -> str: + """Return the inventory path used for the given integration configuration relative to the content root.""" + inventory_names: dict[t.Type[IntegrationConfig], str] = { + PosixIntegrationConfig: 'inventory', + WindowsIntegrationConfig: 'inventory.winrm', + NetworkIntegrationConfig: 'inventory.networking', + } + + return os.path.join(data_context().content.integration_path, inventory_names[type(args)]) + + +def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None: + """Make the given inventory available during delegation.""" + if isinstance(args, PosixIntegrationConfig): + return + + def inventory_callback(files: list[tuple[str, str]]) -> None: + """ + Add the inventory file to the payload file list. + This will preserve the file during delegation even if it is ignored or is outside the content and install roots. + """ + inventory_path = get_inventory_relative_path(args) + inventory_tuple = inventory_path_src, inventory_path + + if os.path.isfile(inventory_path_src) and inventory_tuple not in files: + originals = [item for item in files if item[1] == inventory_path] + + if originals: + for original in originals: + files.remove(original) + + display.warning('Overriding inventory file "%s" with "%s".' % (inventory_path, inventory_path_src)) + else: + display.notice('Sourcing inventory file "%s" from "%s".' % (inventory_path, inventory_path_src)) + + files.append(inventory_tuple) + + data_context().register_payload_callback(inventory_callback) + + +@contextlib.contextmanager +def integration_test_environment( + args: IntegrationConfig, + target: IntegrationTarget, + inventory_path_src: str, +) -> c.Iterator[IntegrationEnvironment]: + """Context manager that prepares the integration test environment and cleans it up.""" + ansible_config_src = args.get_ansible_config() + ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command) + + if args.no_temp_workdir or 'no/temp_workdir/' in target.aliases: + display.warning('Disabling the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + + integration_dir = os.path.join(data_context().content.root, data_context().content.integration_path) + targets_dir = os.path.join(data_context().content.root, data_context().content.integration_targets_path) + inventory_path = inventory_path_src + ansible_config = ansible_config_src + vars_file = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + yield IntegrationEnvironment(data_context().content.root, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + return + + # When testing a collection, the temporary directory must reside within the collection. + # This is necessary to enable support for the default collection for non-collection content (playbooks and roles). + root_temp_dir = os.path.join(ResultType.TMP.path, 'integration') + + prefix = '%s-' % target.name + suffix = '-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8' + + if args.no_temp_unicode or 'no/temp_unicode/' in target.aliases: + display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.') + suffix = '-ansible' + + if args.explain: + temp_dir = os.path.join(root_temp_dir, '%stemp%s' % (prefix, suffix)) + else: + make_dirs(root_temp_dir) + temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_temp_dir) + + try: + display.info('Preparing temporary directory: %s' % temp_dir, verbosity=2) + + inventory_relative_path = get_inventory_relative_path(args) + inventory_path = os.path.join(temp_dir, inventory_relative_path) + + cache = IntegrationCache(args) + + target_dependencies = sorted([target] + list(cache.dependency_map.get(target.name, set()))) + + files_needed = get_files_needed(target_dependencies) + + integration_dir = os.path.join(temp_dir, data_context().content.integration_path) + targets_dir = os.path.join(temp_dir, data_context().content.integration_targets_path) + ansible_config = os.path.join(temp_dir, ansible_config_relative) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + vars_file = os.path.join(temp_dir, data_context().content.integration_vars_path) + + file_copies = [ + (ansible_config_src, ansible_config), + (inventory_path_src, inventory_path), + ] + + if os.path.exists(vars_file_src): + file_copies.append((vars_file_src, vars_file)) + + file_copies += [(path, os.path.join(temp_dir, path)) for path in files_needed] + + integration_targets_relative_path = data_context().content.integration_targets_path + + directory_copies = [ + ( + os.path.join(integration_targets_relative_path, target.relative_path), + os.path.join(temp_dir, integration_targets_relative_path, target.relative_path) + ) + for target in target_dependencies + ] + + directory_copies = sorted(set(directory_copies)) + file_copies = sorted(set(file_copies)) + + if not args.explain: + make_dirs(integration_dir) + + for dir_src, dir_dst in directory_copies: + display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2) + + if not args.explain: + shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True) # type: ignore[arg-type] # incorrect type stub omits bytes path support + + for file_src, file_dst in file_copies: + display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2) + + if not args.explain: + make_dirs(os.path.dirname(file_dst)) + shutil.copy2(file_src, file_dst) + + yield IntegrationEnvironment(temp_dir, integration_dir, targets_dir, inventory_path, ansible_config, vars_file) + finally: + if not args.explain: + remove_tree(temp_dir) + + +@contextlib.contextmanager +def integration_test_config_file( + args: IntegrationConfig, + env_config: CloudEnvironmentConfig, + integration_dir: str, +) -> c.Iterator[t.Optional[str]]: + """Context manager that provides a config file for integration tests, if needed.""" + if not env_config: + yield None + return + + config_vars = (env_config.ansible_vars or {}).copy() + + config_vars.update(dict( + ansible_test=dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + ) + )) + + config_file = json.dumps(config_vars, indent=4, sort_keys=True) + + with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path: # type: str + filename = os.path.relpath(path, integration_dir) + + display.info('>>> Config File: %s\n%s' % (filename, config_file), verbosity=3) + + yield path + + +def create_inventory( + args: IntegrationConfig, + host_state: HostState, + inventory_path: str, + target: IntegrationTarget, +) -> None: + """Create inventory.""" + if isinstance(args, PosixIntegrationConfig): + if target.target_type == IntegrationTargetType.CONTROLLER: + display.info('Configuring controller inventory.', verbosity=1) + create_controller_inventory(args, inventory_path, host_state.controller_profile) + elif target.target_type == IntegrationTargetType.TARGET: + display.info('Configuring target inventory.', verbosity=1) + create_posix_inventory(args, inventory_path, host_state.target_profiles, 'needs/ssh/' in target.aliases) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + elif isinstance(args, WindowsIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_windows_inventory(args, inventory_path, target_profiles) + elif isinstance(args, NetworkIntegrationConfig): + display.info('Configuring target inventory.', verbosity=1) + target_profiles = filter_profiles_for_target(args, host_state.target_profiles, target) + create_network_inventory(args, inventory_path, target_profiles) + + +def command_integration_filtered( + args: IntegrationConfig, + host_state: HostState, + targets: tuple[IntegrationTarget, ...], + all_targets: tuple[IntegrationTarget, ...], + inventory_path: str, + pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, + post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, +): + """Run integration tests for the specified targets.""" + found = False + passed = [] + failed = [] + + targets_iter = iter(targets) + all_targets_dict = dict((target.name, target) for target in all_targets) + + setup_errors = [] + setup_targets_executed: set[str] = set() + + for target in all_targets: + for setup_target in target.setup_once + target.setup_always: + if setup_target not in all_targets_dict: + setup_errors.append('Target "%s" contains invalid setup target: %s' % (target.name, setup_target)) + + if setup_errors: + raise ApplicationError('Found %d invalid setup aliases:\n%s' % (len(setup_errors), '\n'.join(setup_errors))) + + check_pyyaml(host_state.controller_profile.python) + + test_dir = os.path.join(ResultType.TMP.path, 'output_dir') + + if not args.explain and any('needs/ssh/' in target.aliases for target in targets): + max_tries = 20 + display.info('SSH connection to controller required by tests. Checking the connection.') + for i in range(1, max_tries + 1): + try: + run_command(args, ['ssh', '-o', 'BatchMode=yes', 'localhost', 'id'], capture=True) + display.info('SSH service responded.') + break + except SubprocessError: + if i == max_tries: + raise + seconds = 3 + display.warning('SSH service not responding. Waiting %d second(s) before checking again.' % seconds) + time.sleep(seconds) + + start_at_task = args.start_at_task + + results = {} + + target_profile = host_state.target_profiles[0] + + if isinstance(target_profile, PosixProfile): + target_python = target_profile.python + + if isinstance(target_profile, ControllerProfile): + if host_state.controller_profile.python.path != target_profile.python.path: + install_requirements(args, target_python, command=True, controller=False) # integration + elif isinstance(target_profile, SshTargetHostProfile): + connection = target_profile.get_controller_target_connections()[0] + install_requirements(args, target_python, command=True, controller=False, connection=connection) # integration + + coverage_manager = CoverageManager(args, host_state, inventory_path) + coverage_manager.setup() + + try: + for target in targets_iter: + if args.start_at and not found: + found = target.name == args.start_at + + if not found: + continue + + create_inventory(args, host_state, inventory_path, target) + + tries = 2 if args.retry_on_error else 1 + verbosity = args.verbosity + + cloud_environment = get_cloud_environment(args, target) + + try: + while tries: + tries -= 1 + + try: + if cloud_environment: + cloud_environment.setup_once() + + run_setup_targets(args, host_state, test_dir, target.setup_once, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, False) + + start_time = time.time() + + if pre_target: + pre_target(target) + + run_setup_targets(args, host_state, test_dir, target.setup_always, all_targets_dict, setup_targets_executed, inventory_path, + coverage_manager, True) + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + try: + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, start_at_task, test_dir, inventory_path, coverage_manager) + start_at_task = None + finally: + if post_target: + post_target(target) + + end_time = time.time() + + results[target.name] = dict( + name=target.name, + type=target.type, + aliases=target.aliases, + modules=target.modules, + run_time_seconds=int(end_time - start_time), + setup_once=target.setup_once, + setup_always=target.setup_always, + ) + + break + except SubprocessError: + if cloud_environment: + cloud_environment.on_failure(target, tries) + + if not tries: + raise + + if target.retry_never: + display.warning(f'Skipping retry of test target "{target.name}" since it has been excluded from retries.') + raise + + display.warning('Retrying test target "%s" with maximum verbosity.' % target.name) + display.verbosity = args.verbosity = 6 + + passed.append(target) + except Exception as ex: + failed.append(target) + + if args.continue_on_error: + display.error(str(ex)) + continue + + display.notice('To resume at this test target, use the option: --start-at %s' % target.name) + + next_target = next(targets_iter, None) + + if next_target: + display.notice('To resume after this test target, use the option: --start-at %s' % next_target.name) + + raise + finally: + display.verbosity = args.verbosity = verbosity + + finally: + if not args.explain: + coverage_manager.teardown() + + result_name = '%s-%s.json' % ( + args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) + + data = dict( + targets=results, + ) + + write_json_test_results(ResultType.DATA, result_name, data) + + if failed: + raise ApplicationError('The %d integration test(s) listed below (out of %d) failed. See error output above for details:\n%s' % ( + len(failed), len(passed) + len(failed), '\n'.join(target.name for target in failed))) + + +def command_integration_script( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test script.""" + display.info('Running %s integration test script' % target.name) + + env_config = None + + if isinstance(args, PosixIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + cmd = ['./%s' % os.path.basename(target.script_path)] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = os.path.join(test_env.targets_dir, target.relative_path) + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path: # type: t.Optional[str] + if config_path: + cmd += ['-e', '@%s' % config_path] + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def command_integration_role( + args: IntegrationConfig, + host_state: HostState, + target: IntegrationTarget, + start_at_task: t.Optional[str], + test_dir: str, + inventory_path: str, + coverage_manager: CoverageManager, +): + """Run an integration test role.""" + display.info('Running %s integration test role' % target.name) + + env_config = None + + vars_files = [] + variables = dict( + output_dir=test_dir, + ) + + if isinstance(args, WindowsIntegrationConfig): + hosts = 'windows' + gather_facts = False + variables.update(dict( + win_output_dir=r'C:\ansible_testing', + )) + elif isinstance(args, NetworkIntegrationConfig): + hosts = target.network_platform + gather_facts = False + else: + hosts = 'testhost' + gather_facts = True + + if 'gather_facts/yes/' in target.aliases: + gather_facts = True + elif 'gather_facts/no/' in target.aliases: + gather_facts = False + + if not isinstance(args, NetworkIntegrationConfig): + cloud_environment = get_cloud_environment(args, target) + + if cloud_environment: + env_config = cloud_environment.get_environment_config() + + if env_config: + display.info('>>> Environment Config\n%s' % json.dumps(dict( + env_vars=env_config.env_vars, + ansible_vars=env_config.ansible_vars, + callback_plugins=env_config.callback_plugins, + module_defaults=env_config.module_defaults, + ), indent=4, sort_keys=True), verbosity=3) + + with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment + if os.path.exists(test_env.vars_file): + vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir)) + + play = dict( + hosts=hosts, + gather_facts=gather_facts, + vars_files=vars_files, + vars=variables, + roles=[ + target.name, + ], + ) + + if env_config: + if env_config.ansible_vars: + variables.update(env_config.ansible_vars) + + play.update(dict( + environment=env_config.env_vars, + module_defaults=env_config.module_defaults, + )) + + playbook = json.dumps([play], indent=4, sort_keys=True) + + with named_temporary_file(args=args, directory=test_env.integration_dir, prefix='%s-' % target.name, suffix='.yml', content=playbook) as playbook_path: + filename = os.path.basename(playbook_path) + + display.info('>>> Playbook: %s\n%s' % (filename, playbook.strip()), verbosity=3) + + cmd = ['ansible-playbook', filename, '-i', os.path.relpath(test_env.inventory_path, test_env.integration_dir)] + + if start_at_task: + cmd += ['--start-at-task', start_at_task] + + if args.tags: + cmd += ['--tags', args.tags] + + if args.skip_tags: + cmd += ['--skip-tags', args.skip_tags] + + if args.diff: + cmd += ['--diff'] + + if isinstance(args, NetworkIntegrationConfig): + if args.testcase: + cmd += ['-e', 'testcase=%s' % args.testcase] + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env) + cwd = test_env.integration_dir + + env.update(dict( + # support use of adhoc ansible commands in collections without specifying the fully qualified collection name + ANSIBLE_PLAYBOOK_DIR=cwd, + )) + + if env_config and env_config.env_vars: + env.update(env_config.env_vars) + + env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir + + env.update(coverage_manager.get_environment(target.name, target.aliases)) + cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False) + + +def run_setup_targets( + args: IntegrationConfig, + host_state: HostState, + test_dir: str, + target_names: c.Sequence[str], + targets_dict: dict[str, IntegrationTarget], + targets_executed: set[str], + inventory_path: str, + coverage_manager: CoverageManager, + always: bool, +): + """Run setup targets.""" + for target_name in target_names: + if not always and target_name in targets_executed: + continue + + target = targets_dict[target_name] + + if not args.explain: + # create a fresh test directory for each test target + remove_tree(test_dir) + make_dirs(test_dir) + + if target.script_path: + command_integration_script(args, host_state, target, test_dir, inventory_path, coverage_manager) + else: + command_integration_role(args, host_state, target, None, test_dir, inventory_path, coverage_manager) + + targets_executed.add(target_name) + + +def integration_environment( + args: IntegrationConfig, + target: IntegrationTarget, + test_dir: str, + inventory_path: str, + ansible_config: t.Optional[str], + env_config: t.Optional[CloudEnvironmentConfig], + test_env: IntegrationEnvironment, +) -> dict[str, str]: + """Return a dictionary of environment variables to use when running the given integration test target.""" + env = ansible_environment(args, ansible_config=ansible_config) + + callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else []) + + integration = dict( + JUNIT_OUTPUT_DIR=ResultType.JUNIT.path, + JUNIT_TASK_RELATIVE_PATH=test_env.test_dir, + JUNIT_REPLACE_OUT_OF_TREE_PATH='out-of-tree:', + ANSIBLE_CALLBACKS_ENABLED=','.join(sorted(set(callback_plugins))), + ANSIBLE_TEST_CI=args.metadata.ci_provider or get_ci_provider().code, + ANSIBLE_TEST_COVERAGE='check' if args.coverage_check else ('yes' if args.coverage else ''), + OUTPUT_DIR=test_dir, + INVENTORY_PATH=os.path.abspath(inventory_path), + ) + + if args.debug_strategy: + env.update(dict(ANSIBLE_STRATEGY='debug')) + + if 'non_local/' in target.aliases: + if args.coverage: + display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name) + + env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER='')) + + env.update(integration) + + return env + + +class IntegrationEnvironment: + """Details about the integration environment.""" + def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None: + self.test_dir = test_dir + self.integration_dir = integration_dir + self.targets_dir = targets_dir + self.inventory_path = inventory_path + self.ansible_config = ansible_config + self.vars_file = vars_file + + +class IntegrationCache(CommonCache): + """Integration cache.""" + @property + def integration_targets(self) -> list[IntegrationTarget]: + """The list of integration test targets.""" + return self.get('integration_targets', lambda: list(walk_integration_targets())) + + @property + def dependency_map(self) -> dict[str, set[IntegrationTarget]]: + """The dependency map of integration test targets.""" + return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets)) + + +def filter_profiles_for_target(args: IntegrationConfig, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Return a list of profiles after applying target filters.""" + if target.target_type == IntegrationTargetType.CONTROLLER: + profile_filter = get_target_filter(args, [args.controller], True) + elif target.target_type == IntegrationTargetType.TARGET: + profile_filter = get_target_filter(args, args.targets, False) + else: + raise Exception(f'Unhandled test type for target "{target.name}": {target.target_type.name.lower()}') + + profiles = profile_filter.filter_profiles(profiles, target) + + return profiles + + +def get_integration_filter(args: IntegrationConfig, targets: list[IntegrationTarget]) -> set[str]: + """Return a list of test targets to skip based on the host(s) that will be used to run the specified test targets.""" + invalid_targets = sorted(target.name for target in targets if target.target_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets and not args.list_targets: + message = f'''Unable to determine context for the following test targets: {", ".join(invalid_targets)} + +Make sure the test targets are correctly named: + + - Modules - The target name should match the module name. + - Plugins - The target name should be "{{plugin_type}}_{{plugin_name}}". + +If necessary, context can be controlled by adding entries to the "aliases" file for a test target: + + - Add the name(s) of modules which are tested. + - Add "context/target" for module and module_utils tests (these will run on the target host). + - Add "context/controller" for other test types (these will run on the controller).''' + + raise ApplicationError(message) + + invalid_targets = sorted(target.name for target in targets if target.actual_type not in (IntegrationTargetType.CONTROLLER, IntegrationTargetType.TARGET)) + + if invalid_targets: + if data_context().content.is_ansible: + display.warning(f'Unable to determine context for the following test targets: {", ".join(invalid_targets)}') + else: + display.warning(f'Unable to determine context for the following test targets, they will be run on the target host: {", ".join(invalid_targets)}') + + exclude: set[str] = set() + + controller_targets = [target for target in targets if target.target_type == IntegrationTargetType.CONTROLLER] + target_targets = [target for target in targets if target.target_type == IntegrationTargetType.TARGET] + + controller_filter = get_target_filter(args, [args.controller], True) + target_filter = get_target_filter(args, args.targets, False) + + controller_filter.filter_targets(controller_targets, exclude) + target_filter.filter_targets(target_targets, exclude) + + return exclude + + +def command_integration_filter(args: TIntegrationConfig, + targets: c.Iterable[TIntegrationTarget], + ) -> tuple[HostState, tuple[TIntegrationTarget, ...]]: + """Filter the given integration test targets.""" + targets = tuple(target for target in targets if 'hidden/' not in target.aliases) + changes = get_changes_filter(args) + + # special behavior when the --changed-all-target target is selected based on changes + if args.changed_all_target in changes: + # act as though the --changed-all-target target was in the include list + if args.changed_all_mode == 'include' and args.changed_all_target not in args.include: + args.include.append(args.changed_all_target) + args.delegate_args += ['--include', args.changed_all_target] + # act as though the --changed-all-target target was in the exclude list + elif args.changed_all_mode == 'exclude' and args.changed_all_target not in args.exclude: + args.exclude.append(args.changed_all_target) + + require = args.require + changes + exclude = args.exclude + + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + environment_exclude = get_integration_filter(args, list(internal_targets)) + + environment_exclude |= set(cloud_filter(args, internal_targets)) + + if environment_exclude: + exclude = sorted(set(exclude) | environment_exclude) + internal_targets = walk_internal_targets(targets, args.include, exclude, require) + + if not internal_targets: + raise AllTargetsSkipped() + + if args.start_at and not any(target.name == args.start_at for target in internal_targets): + raise ApplicationError('Start at target matches nothing: %s' % args.start_at) + + cloud_init(args, internal_targets) + + vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path) + + if os.path.exists(vars_file_src): + def integration_config_callback(files: list[tuple[str, str]]) -> None: + """ + Add the integration config vars file to the payload file list. + This will preserve the file during delegation even if the file is ignored by source control. + """ + files.append((vars_file_src, data_context().content.integration_vars_path)) + + data_context().register_payload_callback(integration_config_callback) + + if args.list_targets: + raise ListTargets([target.name for target in internal_targets]) + + # requirements are installed using a callback since the windows-integration and network-integration host status checks depend on them + host_state = prepare_profiles(args, targets_use_pypi=True, requirements=requirements) # integration, windows-integration, network-integration + + if args.delegate: + raise Delegate(host_state=host_state, require=require, exclude=exclude) + + return host_state, internal_targets + + +def requirements(host_profile: HostProfile) -> None: + """Install requirements after bootstrapping and delegation.""" + if isinstance(host_profile, ControllerHostProfile) and host_profile.controller: + configure_pypi_proxy(host_profile.args, host_profile) # integration, windows-integration, network-integration + install_requirements(host_profile.args, host_profile.python, ansible=True, command=True) # integration, windows-integration, network-integration + elif isinstance(host_profile, PosixProfile) and not isinstance(host_profile, ControllerProfile): + configure_pypi_proxy(host_profile.args, host_profile) # integration diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py new file mode 100644 index 0000000..0c078b9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/__init__.py @@ -0,0 +1,389 @@ +"""Plugin system for cloud providers and environments for use in integration tests.""" +from __future__ import annotations + +import abc +import atexit +import datetime +import os +import re +import tempfile +import time +import typing as t + +from ....encoding import ( + to_bytes, +) + +from ....io import ( + read_text_file, +) + +from ....util import ( + ANSIBLE_TEST_CONFIG_ROOT, + ApplicationError, + display, + import_plugins, + load_plugins, + cache, +) + +from ....util_common import ( + ResultType, + write_json_test_results, +) + +from ....target import ( + IntegrationTarget, +) + +from ....config import ( + IntegrationConfig, + TestConfig, +) + +from ....ci import ( + get_ci_provider, +) + +from ....data import ( + data_context, +) + +from ....docker_util import ( + docker_available, +) + + +@cache +def get_cloud_plugins() -> tuple[dict[str, t.Type[CloudProvider]], dict[str, t.Type[CloudEnvironment]]]: + """Import cloud plugins and load them into the plugin dictionaries.""" + import_plugins('commands/integration/cloud') + + providers: dict[str, t.Type[CloudProvider]] = {} + environments: dict[str, t.Type[CloudEnvironment]] = {} + + load_plugins(CloudProvider, providers) + load_plugins(CloudEnvironment, environments) + + return providers, environments + + +@cache +def get_provider_plugins() -> dict[str, t.Type[CloudProvider]]: + """Return a dictionary of the available cloud provider plugins.""" + return get_cloud_plugins()[0] + + +@cache +def get_environment_plugins() -> dict[str, t.Type[CloudEnvironment]]: + """Return a dictionary of the available cloud environment plugins.""" + return get_cloud_plugins()[1] + + +def get_cloud_platforms(args: TestConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[str]: + """Return cloud platform names for the specified targets.""" + if isinstance(args, IntegrationConfig): + if args.list_targets: + return [] + + if targets is None: + cloud_platforms = set(args.metadata.cloud_config or []) + else: + cloud_platforms = set(get_cloud_platform(target) for target in targets) + + cloud_platforms.discard(None) + + return sorted(cloud_platforms) + + +def get_cloud_platform(target: IntegrationTarget) -> t.Optional[str]: + """Return the name of the cloud platform used for the given target, or None if no cloud platform is used.""" + cloud_platforms = set(a.split('/')[1] for a in target.aliases if a.startswith('cloud/') and a.endswith('/') and a != 'cloud/') + + if not cloud_platforms: + return None + + if len(cloud_platforms) == 1: + cloud_platform = cloud_platforms.pop() + + if cloud_platform not in get_provider_plugins(): + raise ApplicationError('Target %s aliases contains unknown cloud platform: %s' % (target.name, cloud_platform)) + + return cloud_platform + + raise ApplicationError('Target %s aliases contains multiple cloud platforms: %s' % (target.name, ', '.join(sorted(cloud_platforms)))) + + +def get_cloud_providers(args: IntegrationConfig, targets: t.Optional[tuple[IntegrationTarget, ...]] = None) -> list[CloudProvider]: + """Return a list of cloud providers for the given targets.""" + return [get_provider_plugins()[p](args) for p in get_cloud_platforms(args, targets)] + + +def get_cloud_environment(args: IntegrationConfig, target: IntegrationTarget) -> t.Optional[CloudEnvironment]: + """Return the cloud environment for the given target, or None if no cloud environment is used for the target.""" + cloud_platform = get_cloud_platform(target) + + if not cloud_platform: + return None + + return get_environment_plugins()[cloud_platform](args) + + +def cloud_filter(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> list[str]: + """Return a list of target names to exclude based on the given targets.""" + if args.metadata.cloud_config is not None: + return [] # cloud filter already performed prior to delegation + + exclude: list[str] = [] + + for provider in get_cloud_providers(args, targets): + provider.filter(targets, exclude) + + return exclude + + +def cloud_init(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...]) -> None: + """Initialize cloud plugins for the given targets.""" + if args.metadata.cloud_config is not None: + return # cloud configuration already established prior to delegation + + args.metadata.cloud_config = {} + + results = {} + + for provider in get_cloud_providers(args, targets): + if args.prime_containers and not provider.uses_docker: + continue + + args.metadata.cloud_config[provider.platform] = {} + + start_time = time.time() + provider.setup() + end_time = time.time() + + results[provider.platform] = dict( + platform=provider.platform, + setup_seconds=int(end_time - start_time), + targets=[target.name for target in targets], + ) + + if not args.explain and results: + result_name = '%s-%s.json' % ( + args.command, re.sub(r'[^0-9]', '-', str(datetime.datetime.utcnow().replace(microsecond=0)))) + + data = dict( + clouds=results, + ) + + write_json_test_results(ResultType.DATA, result_name, data) + + +class CloudBase(metaclass=abc.ABCMeta): + """Base class for cloud plugins.""" + _CONFIG_PATH = 'config_path' + _RESOURCE_PREFIX = 'resource_prefix' + _MANAGED = 'managed' + _SETUP_EXECUTED = 'setup_executed' + + def __init__(self, args: IntegrationConfig) -> None: + self.args = args + self.platform = self.__module__.rsplit('.', 1)[-1] + + def config_callback(files: list[tuple[str, str]]) -> None: + """Add the config file to the payload file list.""" + if self.platform not in self.args.metadata.cloud_config: + return # platform was initialized, but not used -- such as being skipped due to all tests being disabled + + if self._get_cloud_config(self._CONFIG_PATH, ''): + pair = (self.config_path, os.path.relpath(self.config_path, data_context().content.root)) + + if pair not in files: + display.info('Including %s config: %s -> %s' % (self.platform, pair[0], pair[1]), verbosity=3) + files.append(pair) + + data_context().register_payload_callback(config_callback) + + @property + def setup_executed(self) -> bool: + """True if setup has been executed, otherwise False.""" + return t.cast(bool, self._get_cloud_config(self._SETUP_EXECUTED, False)) + + @setup_executed.setter + def setup_executed(self, value: bool) -> None: + """True if setup has been executed, otherwise False.""" + self._set_cloud_config(self._SETUP_EXECUTED, value) + + @property + def config_path(self) -> str: + """Path to the configuration file.""" + return os.path.join(data_context().content.root, str(self._get_cloud_config(self._CONFIG_PATH))) + + @config_path.setter + def config_path(self, value: str) -> None: + """Path to the configuration file.""" + self._set_cloud_config(self._CONFIG_PATH, value) + + @property + def resource_prefix(self) -> str: + """Resource prefix.""" + return str(self._get_cloud_config(self._RESOURCE_PREFIX)) + + @resource_prefix.setter + def resource_prefix(self, value: str) -> None: + """Resource prefix.""" + self._set_cloud_config(self._RESOURCE_PREFIX, value) + + @property + def managed(self) -> bool: + """True if resources are managed by ansible-test, otherwise False.""" + return t.cast(bool, self._get_cloud_config(self._MANAGED)) + + @managed.setter + def managed(self, value: bool) -> None: + """True if resources are managed by ansible-test, otherwise False.""" + self._set_cloud_config(self._MANAGED, value) + + def _get_cloud_config(self, key: str, default: t.Optional[t.Union[str, int, bool]] = None) -> t.Union[str, int, bool]: + """Return the specified value from the internal configuration.""" + if default is not None: + return self.args.metadata.cloud_config[self.platform].get(key, default) + + return self.args.metadata.cloud_config[self.platform][key] + + def _set_cloud_config(self, key: str, value: t.Union[str, int, bool]) -> None: + """Set the specified key and value in the internal configuration.""" + self.args.metadata.cloud_config[self.platform][key] = value + + +class CloudProvider(CloudBase): + """Base class for cloud provider plugins. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig, config_extension: str = '.ini') -> None: + super().__init__(args) + + self.ci_provider = get_ci_provider() + self.remove_config = False + self.config_static_name = 'cloud-config-%s%s' % (self.platform, config_extension) + self.config_static_path = os.path.join(data_context().content.integration_path, self.config_static_name) + self.config_template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, '%s.template' % self.config_static_name) + self.config_extension = config_extension + + self.uses_config = False + self.uses_docker = False + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + if not self.uses_docker and not self.uses_config: + return + + if self.uses_docker and docker_available(): + return + + if self.uses_config and os.path.exists(self.config_static_path): + return + + skip = 'cloud/%s/' % self.platform + skipped = [target.name for target in targets if skip in target.aliases] + + if skipped: + exclude.append(skip) + + if not self.uses_docker and self.uses_config: + display.warning('Excluding tests marked "%s" which require a "%s" config file (see "%s"): %s' + % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped))) + elif self.uses_docker and not self.uses_config: + display.warning('Excluding tests marked "%s" which requires container support: %s' + % (skip.rstrip('/'), ', '.join(skipped))) + elif self.uses_docker and self.uses_config: + display.warning('Excluding tests marked "%s" which requires container support or a "%s" config file (see "%s"): %s' + % (skip.rstrip('/'), self.config_static_path, self.config_template_path, ', '.join(skipped))) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + self.resource_prefix = self.ci_provider.generate_resource_prefix() + self.resource_prefix = re.sub(r'[^a-zA-Z0-9]+', '-', self.resource_prefix)[:63].lower().rstrip('-') + + atexit.register(self.cleanup) + + def cleanup(self) -> None: + """Clean up the cloud resource and any temporary configuration files after tests complete.""" + if self.remove_config: + os.remove(self.config_path) + + def _use_static_config(self) -> bool: + """Use a static config file if available. Returns True if static config is used, otherwise returns False.""" + if os.path.isfile(self.config_static_path): + display.info('Using existing %s cloud config: %s' % (self.platform, self.config_static_path), verbosity=1) + self.config_path = self.config_static_path + static = True + else: + static = False + + self.managed = not static + + return static + + def _write_config(self, content: str) -> None: + """Write the given content to the config file.""" + prefix = '%s-' % os.path.splitext(os.path.basename(self.config_static_path))[0] + + with tempfile.NamedTemporaryFile(dir=data_context().content.integration_path, prefix=prefix, suffix=self.config_extension, delete=False) as config_fd: + filename = os.path.join(data_context().content.integration_path, os.path.basename(config_fd.name)) + + self.config_path = filename + self.remove_config = True + + display.info('>>> Config: %s\n%s' % (filename, content.strip()), verbosity=3) + + config_fd.write(to_bytes(content)) + config_fd.flush() + + def _read_config_template(self) -> str: + """Read and return the configuration template.""" + lines = read_text_file(self.config_template_path).splitlines() + lines = [line for line in lines if not line.startswith('#')] + config = '\n'.join(lines).strip() + '\n' + return config + + @staticmethod + def _populate_config_template(template: str, values: dict[str, str]) -> str: + """Populate and return the given template with the provided values.""" + for key in sorted(values): + value = values[key] + template = template.replace('@%s' % key, value) + + return template + + +class CloudEnvironment(CloudBase): + """Base class for cloud environment plugins. Updates integration test environment after delegation.""" + def setup_once(self) -> None: + """Run setup if it has not already been run.""" + if self.setup_executed: + return + + self.setup() + self.setup_executed = True + + def setup(self) -> None: + """Setup which should be done once per environment instead of once per test target.""" + + @abc.abstractmethod + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + + +class CloudEnvironmentConfig: + """Configuration for the environment.""" + def __init__(self, + env_vars: t.Optional[dict[str, str]] = None, + ansible_vars: t.Optional[dict[str, t.Any]] = None, + module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None, + callback_plugins: t.Optional[list[str]] = None, + ): + self.env_vars = env_vars + self.ansible_vars = ansible_vars + self.module_defaults = module_defaults + self.callback_plugins = callback_plugins diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py new file mode 100644 index 0000000..007d383 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/acme.py @@ -0,0 +1,79 @@ +"""ACME plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ACMEProvider(CloudProvider): + """ACME plugin. Sets up cloud resources for tests.""" + DOCKER_SIMULATOR_NAME = 'acme-simulator' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # The simulator must be pinned to a specific version to guarantee CI passes with the version used. + if os.environ.get('ANSIBLE_ACME_CONTAINER'): + self.image = os.environ.get('ANSIBLE_ACME_CONTAINER') + else: + self.image = 'quay.io/ansible/acme-test-container:2.1.0' + + self.uses_docker = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Create a ACME test container using docker.""" + ports = [ + 5000, # control port for flask app in container + 14000, # Pebble ACME CA + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('acme_host', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class ACMEEnvironment(CloudEnvironment): + """ACME environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + ansible_vars = dict( + acme_host=self._get_cloud_config('acme_host'), + ) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py new file mode 100644 index 0000000..234f311 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/aws.py @@ -0,0 +1,131 @@ +"""AWS plugin for integration tests.""" +from __future__ import annotations + +import os +import uuid +import configparser +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from ....host_configs import ( + OriginConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class AwsCloudProvider(CloudProvider): + """AWS cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + aws_config_path = os.path.expanduser('~/.aws') + + if os.path.exists(aws_config_path) and isinstance(self.args.controller, OriginConfig): + raise ApplicationError('Rename "%s" or use the --docker or --remote option to isolate tests.' % aws_config_path) + + if not self._use_static_config(): + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Request AWS credentials through the Ansible Core CI service.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + + aci = self._create_ansible_core_ci() + + response = aci.start() + + if not self.args.explain: + credentials = response['aws']['credentials'] + + values = dict( + ACCESS_KEY=credentials['access_key'], + SECRET_KEY=credentials['secret_key'], + SECURITY_TOKEN=credentials['session_token'], + REGION='us-east-1', + ) + + display.sensitive.add(values['SECRET_KEY']) + display.sensitive.add(values['SECURITY_TOKEN']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return an AWS instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='aws')) + + +class AwsCloudEnvironment(CloudEnvironment): + """AWS cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars: dict[str, t.Any] = dict( + resource_prefix=self.resource_prefix, + tiny_prefix=uuid.uuid4().hex[0:12] + ) + + ansible_vars.update(dict(parser.items('default'))) + + display.sensitive.add(ansible_vars.get('aws_secret_key')) + display.sensitive.add(ansible_vars.get('security_token')) + + if 'aws_cleanup' not in ansible_vars: + ansible_vars['aws_cleanup'] = not self.managed + + env_vars = {'ANSIBLE_DEBUG_BOTOCORE_LOGS': 'True'} + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + callback_plugins=['aws_resource_actions'], + ) + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + if not tries and self.managed: + display.notice('If %s failed due to permissions, the IAM test policy may need to be updated. ' + 'https://docs.ansible.com/ansible/devel/collections/amazon/aws/docsite/dev_guidelines.html#aws-permissions-for-integration-tests' + % target.name) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py new file mode 100644 index 0000000..dc5136a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/azure.py @@ -0,0 +1,166 @@ +"""Azure plugin for integration tests.""" +from __future__ import annotations + +import configparser +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class AzureCloudProvider(CloudProvider): + """Azure cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.aci: t.Optional[AnsibleCoreCI] = None + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + get_config(self.config_path) # check required variables + + def cleanup(self) -> None: + """Clean up the cloud resource and any temporary configuration files after tests complete.""" + if self.aci: + self.aci.stop() + + super().cleanup() + + def _setup_dynamic(self) -> None: + """Request Azure credentials through ansible-core-ci.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + response = {} + + aci = self._create_ansible_core_ci() + + aci_result = aci.start() + + if not self.args.explain: + response = aci_result['azure'] + self.aci = aci + + if not self.args.explain: + values = dict( + AZURE_CLIENT_ID=response['clientId'], + AZURE_SECRET=response['clientSecret'], + AZURE_SUBSCRIPTION_ID=response['subscriptionId'], + AZURE_TENANT=response['tenantId'], + RESOURCE_GROUP=response['resourceGroupNames'][0], + RESOURCE_GROUP_SECONDARY=response['resourceGroupNames'][1], + ) + + display.sensitive.add(values['AZURE_SECRET']) + + config = '\n'.join('%s: %s' % (key, values[key]) for key in sorted(values)) + + config = '[default]\n' + config + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return an Azure instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='azure')) + + +class AzureCloudEnvironment(CloudEnvironment): + """Azure cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = get_config(self.config_path) + + display.sensitive.add(env_vars.get('AZURE_SECRET')) + display.sensitive.add(env_vars.get('AZURE_PASSWORD')) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) + + def on_failure(self, target: IntegrationTarget, tries: int) -> None: + """Callback to run when an integration target fails.""" + if not tries and self.managed: + display.notice('If %s failed due to permissions, the test policy may need to be updated.' % target.name) + + +def get_config(config_path: str) -> dict[str, str]: + """Return a configuration dictionary parsed from the given configuration path.""" + parser = configparser.ConfigParser() + parser.read(config_path) + + config = dict((key.upper(), value) for key, value in parser.items('default')) + + rg_vars = ( + 'RESOURCE_GROUP', + 'RESOURCE_GROUP_SECONDARY', + ) + + sp_vars = ( + 'AZURE_CLIENT_ID', + 'AZURE_SECRET', + 'AZURE_SUBSCRIPTION_ID', + 'AZURE_TENANT', + ) + + ad_vars = ( + 'AZURE_AD_USER', + 'AZURE_PASSWORD', + 'AZURE_SUBSCRIPTION_ID', + ) + + rg_ok = all(var in config for var in rg_vars) + sp_ok = all(var in config for var in sp_vars) + ad_ok = all(var in config for var in ad_vars) + + if not rg_ok: + raise ApplicationError('Resource groups must be defined with: %s' % ', '.join(sorted(rg_vars))) + + if not sp_ok and not ad_ok: + raise ApplicationError('Credentials must be defined using either:\nService Principal: %s\nActive Directory: %s' % ( + ', '.join(sorted(sp_vars)), ', '.join(sorted(ad_vars)))) + + return config diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py new file mode 100644 index 0000000..f453ef3 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cloudscale.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# (c) 2018, Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +"""Cloudscale plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class CloudscaleCloudProvider(CloudProvider): + """Cloudscale cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class CloudscaleCloudEnvironment(CloudEnvironment): + """Cloudscale cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + CLOUDSCALE_API_TOKEN=parser.get('default', 'cloudscale_api_token'), + ) + + display.sensitive.add(env_vars['CLOUDSCALE_API_TOKEN']) + + ansible_vars = dict( + cloudscale_resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py new file mode 100644 index 0000000..0037b42 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/cs.py @@ -0,0 +1,174 @@ +"""CloudStack plugin for integration tests.""" +from __future__ import annotations + +import json +import configparser +import os +import urllib.parse +import typing as t + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....docker_util import ( + docker_exec, +) + +from ....containers import ( + CleanupMode, + run_support_container, + wait_for_file, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class CsCloudProvider(CloudProvider): + """CloudStack cloud provider plugin. Sets up cloud resources before delegation.""" + DOCKER_SIMULATOR_NAME = 'cloudstack-sim' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.image = os.environ.get('ANSIBLE_CLOUDSTACK_CONTAINER', 'quay.io/ansible/cloudstack-test-container:1.4.0') + self.host = '' + self.port = 0 + + self.uses_docker = True + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_static(self) -> None: + """Configure CloudStack tests for use with static configuration.""" + parser = configparser.ConfigParser() + parser.read(self.config_static_path) + + endpoint = parser.get('cloudstack', 'endpoint') + + parts = urllib.parse.urlparse(endpoint) + + self.host = parts.hostname + + if not self.host: + raise ApplicationError('Could not determine host from endpoint: %s' % endpoint) + + if parts.port: + self.port = parts.port + elif parts.scheme == 'http': + self.port = 80 + elif parts.scheme == 'https': + self.port = 443 + else: + raise ApplicationError('Could not determine port from endpoint: %s' % endpoint) + + display.info('Read cs host "%s" and port %d from config: %s' % (self.host, self.port, self.config_static_path), verbosity=1) + + def _setup_dynamic(self) -> None: + """Create a CloudStack simulator using docker.""" + config = self._read_config_template() + + self.port = 8888 + + ports = [ + self.port, + ] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + if not descriptor: + return + + # apply work-around for OverlayFS issue + # https://github.com/docker/for-linux/issues/72#issuecomment-319904698 + docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'], capture=True) + + if self.args.explain: + values = dict( + HOST=self.host, + PORT=str(self.port), + ) + else: + credentials = self._get_credentials(self.DOCKER_SIMULATOR_NAME) + + values = dict( + HOST=self.DOCKER_SIMULATOR_NAME, + PORT=str(self.port), + KEY=credentials['apikey'], + SECRET=credentials['secretkey'], + ) + + display.sensitive.add(values['SECRET']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _get_credentials(self, container_name: str) -> dict[str, t.Any]: + """Wait for the CloudStack simulator to return credentials.""" + def check(value) -> bool: + """Return True if the given configuration is valid JSON, otherwise return False.""" + # noinspection PyBroadException + try: + json.loads(value) + except Exception: # pylint: disable=broad-except + return False # sometimes the file exists but is not yet valid JSON + + return True + + stdout = wait_for_file(self.args, container_name, '/var/www/html/admin.json', sleep=10, tries=30, check=check) + + return json.loads(stdout) + + +class CsCloudEnvironment(CloudEnvironment): + """CloudStack cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + config = dict(parser.items('default')) + + env_vars = dict( + CLOUDSTACK_ENDPOINT=config['endpoint'], + CLOUDSTACK_KEY=config['key'], + CLOUDSTACK_SECRET=config['secret'], + CLOUDSTACK_TIMEOUT=config['timeout'], + ) + + display.sensitive.add(env_vars['CLOUDSTACK_SECRET']) + + ansible_vars = dict( + cs_resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py new file mode 100644 index 0000000..a46bf70 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/digitalocean.py @@ -0,0 +1,55 @@ +"""DigitalOcean plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class DigitalOceanCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class DigitalOceanCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + DO_API_KEY=parser.get('default', 'key'), + ) + + display.sensitive.add(env_vars['DO_API_KEY']) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py new file mode 100644 index 0000000..c2413ee --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/foreman.py @@ -0,0 +1,94 @@ +"""Foreman plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ForemanProvider(CloudProvider): + """Foreman plugin. Sets up Foreman stub server for tests.""" + DOCKER_SIMULATOR_NAME = 'foreman-stub' + + # Default image to run Foreman stub from. + # + # The simulator must be pinned to a specific version + # to guarantee CI passes with the version used. + # + # It's source source itself resides at: + # https://github.com/ansible/foreman-test-container + DOCKER_IMAGE = 'quay.io/ansible/foreman-test-container:1.4.0' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.__container_from_env = os.environ.get('ANSIBLE_FRMNSIM_CONTAINER') + """ + Overrides target container, might be used for development. + + Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want + to use other image. Omit/empty otherwise. + """ + self.image = self.__container_from_env or self.DOCKER_IMAGE + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Spawn a Foreman stub within docker container.""" + foreman_port = 8080 + + ports = [ + foreman_port, + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('FOREMAN_HOST', self.DOCKER_SIMULATOR_NAME) + self._set_cloud_config('FOREMAN_PORT', str(foreman_port)) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class ForemanEnvironment(CloudEnvironment): + """Foreman environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = dict( + FOREMAN_HOST=str(self._get_cloud_config('FOREMAN_HOST')), + FOREMAN_PORT=str(self._get_cloud_config('FOREMAN_PORT')), + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py new file mode 100644 index 0000000..e180a02 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/galaxy.py @@ -0,0 +1,168 @@ +"""Galaxy (ansible-galaxy) plugin for integration tests.""" +from __future__ import annotations + +import os +import tempfile + +from ....config import ( + IntegrationConfig, +) + +from ....docker_util import ( + docker_cp_to, +) + +from ....containers import ( + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +# We add BasicAuthentication, to make the tasks that deal with +# direct API access easier to deal with across galaxy_ng and pulp +SETTINGS = b''' +CONTENT_ORIGIN = 'http://ansible-ci-pulp:80' +ANSIBLE_API_HOSTNAME = 'http://ansible-ci-pulp:80' +ANSIBLE_CONTENT_HOSTNAME = 'http://ansible-ci-pulp:80/pulp/content' +TOKEN_AUTH_DISABLED = True +GALAXY_REQUIRE_CONTENT_APPROVAL = False +GALAXY_AUTHENTICATION_CLASSES = [ + "rest_framework.authentication.SessionAuthentication", + "rest_framework.authentication.TokenAuthentication", + "rest_framework.authentication.BasicAuthentication", +] +''' + +SET_ADMIN_PASSWORD = b'''#!/usr/bin/execlineb -S0 +foreground { + redirfd -w 1 /dev/null + redirfd -w 2 /dev/null + export DJANGO_SETTINGS_MODULE pulpcore.app.settings + export PULP_CONTENT_ORIGIN localhost + s6-setuidgid postgres + if { /usr/local/bin/django-admin reset-admin-password --password password } + if { /usr/local/bin/pulpcore-manager create-group system:partner-engineers --users admin } +} +''' + +# There are 2 overrides here: +# 1. Change the gunicorn bind address from 127.0.0.1 to 0.0.0.0 now that Galaxy NG does not allow us to access the +# Pulp API through it. +# 2. Grant access allowing us to DELETE a namespace in Galaxy NG. This is as CI deletes and recreates repos and +# distributions in Pulp which now breaks the namespace in Galaxy NG. Recreating it is the "simple" fix to get it +# working again. +# These may not be needed in the future, especially if 1 becomes configurable by an env var but for now they must be +# done. +OVERRIDES = b'''#!/usr/bin/execlineb -S0 +foreground { + sed -i "0,/\\"127.0.0.1:24817\\"/s//\\"0.0.0.0:24817\\"/" /etc/services.d/pulpcore-api/run +} + +# This sed calls changes the first occurrence to "allow" which is conveniently the delete operation for a namespace. +# https://github.com/ansible/galaxy_ng/blob/master/galaxy_ng/app/access_control/statements/standalone.py#L9-L11. +backtick NG_PREFIX { python -c "import galaxy_ng; print(galaxy_ng.__path__[0], end='')" } +importas ng_prefix NG_PREFIX +foreground { + sed -i "0,/\\"effect\\": \\"deny\\"/s//\\"effect\\": \\"allow\\"/" ${ng_prefix}/app/access_control/statements/standalone.py +}''' + + +class GalaxyProvider(CloudProvider): + """ + Galaxy plugin. Sets up pulp (ansible-galaxy) servers for tests. + The pulp source itself resides at: https://github.com/pulp/pulp-oci-images + """ + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # Cannot use the latest container image as either galaxy_ng 4.2.0rc2 or pulp 0.5.0 has sporatic issues with + # dropping published collections in CI. Try running the tests multiple times when updating. Will also need to + # comment out the cache tests in 'test/integration/targets/ansible-galaxy-collection/tasks/install.yml' when + # the newer update is available. + self.pulp = os.environ.get( + 'ANSIBLE_PULP_CONTAINER', + 'quay.io/ansible/pulp-galaxy-ng:b79a7be64eff' + ) + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + galaxy_port = 80 + pulp_host = 'ansible-ci-pulp' + pulp_port = 24817 + + ports = [ + galaxy_port, + pulp_port, + ] + + # Create the container, don't run it, we need to inject configs before it starts + descriptor = run_support_container( + self.args, + self.platform, + self.pulp, + pulp_host, + ports, + start=False, + allow_existing=True, + ) + + if not descriptor: + return + + if not descriptor.running: + pulp_id = descriptor.container_id + + injected_files = { + '/etc/pulp/settings.py': SETTINGS, + '/etc/cont-init.d/111-postgres': SET_ADMIN_PASSWORD, + '/etc/cont-init.d/000-ansible-test-overrides': OVERRIDES, + } + for path, content in injected_files.items(): + with tempfile.NamedTemporaryFile() as temp_fd: + temp_fd.write(content) + temp_fd.flush() + docker_cp_to(self.args, pulp_id, temp_fd.name, path) + + descriptor.start(self.args) + + self._set_cloud_config('PULP_HOST', pulp_host) + self._set_cloud_config('PULP_PORT', str(pulp_port)) + self._set_cloud_config('GALAXY_PORT', str(galaxy_port)) + self._set_cloud_config('PULP_USER', 'admin') + self._set_cloud_config('PULP_PASSWORD', 'password') + + +class GalaxyEnvironment(CloudEnvironment): + """Galaxy environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + pulp_user = str(self._get_cloud_config('PULP_USER')) + pulp_password = str(self._get_cloud_config('PULP_PASSWORD')) + pulp_host = self._get_cloud_config('PULP_HOST') + galaxy_port = self._get_cloud_config('GALAXY_PORT') + pulp_port = self._get_cloud_config('PULP_PORT') + + return CloudEnvironmentConfig( + ansible_vars=dict( + pulp_user=pulp_user, + pulp_password=pulp_password, + pulp_api='http://%s:%s' % (pulp_host, pulp_port), + pulp_server='http://%s:%s/pulp_ansible/galaxy/' % (pulp_host, pulp_port), + galaxy_ng_server='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port), + ), + env_vars=dict( + PULP_USER=pulp_user, + PULP_PASSWORD=pulp_password, + PULP_SERVER='http://%s:%s/pulp_ansible/galaxy/api/' % (pulp_host, pulp_port), + GALAXY_NG_SERVER='http://%s:%s/api/galaxy/' % (pulp_host, galaxy_port), + ), + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py new file mode 100644 index 0000000..28ffb7b --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/gcp.py @@ -0,0 +1,55 @@ +# Copyright: (c) 2018, Google Inc. +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""GCP plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class GcpCloudProvider(CloudProvider): + """GCP cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + display.notice( + 'static configuration could not be used. are you missing a template file?' + ) + + +class GcpCloudEnvironment(CloudEnvironment): + """GCP cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict(parser.items('default'))) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py new file mode 100644 index 0000000..4d75f22 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/hcloud.py @@ -0,0 +1,106 @@ +"""Hetzner Cloud plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....target import ( + IntegrationTarget, +) + +from ....core_ci import ( + AnsibleCoreCI, + CloudResource, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class HcloudCloudProvider(CloudProvider): + """Hetzner Cloud provider plugin. Sets up cloud resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def filter(self, targets: tuple[IntegrationTarget, ...], exclude: list[str]) -> None: + """Filter out the cloud tests when the necessary config and resources are not available.""" + aci = self._create_ansible_core_ci() + + if aci.available: + return + + super().filter(targets, exclude) + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Request Hetzner credentials through the Ansible Core CI service.""" + display.info('Provisioning %s cloud environment.' % self.platform, verbosity=1) + + config = self._read_config_template() + + aci = self._create_ansible_core_ci() + + response = aci.start() + + if not self.args.explain: + token = response['hetzner']['token'] + + display.sensitive.add(token) + display.info('Hetzner Cloud Token: %s' % token, verbosity=1) + + values = dict( + TOKEN=token, + ) + + display.sensitive.add(values['TOKEN']) + + config = self._populate_config_template(config, values) + + self._write_config(config) + + def _create_ansible_core_ci(self) -> AnsibleCoreCI: + """Return a Heztner instance of AnsibleCoreCI.""" + return AnsibleCoreCI(self.args, CloudResource(platform='hetzner')) + + +class HcloudCloudEnvironment(CloudEnvironment): + """Hetzner Cloud cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + HCLOUD_TOKEN=parser.get('default', 'hcloud_api_token'), + ) + + display.sensitive.add(env_vars['HCLOUD_TOKEN']) + + ansible_vars = dict( + hcloud_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict((key.lower(), value) for key, value in env_vars.items())) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py new file mode 100644 index 0000000..e250eed --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/httptester.py @@ -0,0 +1,92 @@ +"""HTTP Tester plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....util import ( + display, + generate_password, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + +KRB5_PASSWORD_ENV = 'KRB5_PASSWORD' + + +class HttptesterProvider(CloudProvider): + """HTTP Tester provider plugin. Sets up resources before delegation.""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.image = os.environ.get('ANSIBLE_HTTP_TEST_CONTAINER', 'quay.io/ansible/http-test-container:2.1.0') + + self.uses_docker = True + + def setup(self) -> None: + """Setup resources before delegation.""" + super().setup() + + ports = [ + 80, + 88, + 443, + 444, + 749, + ] + + aliases = [ + 'ansible.http.tests', + 'sni1.ansible.http.tests', + 'fail.ansible.http.tests', + 'self-signed.ansible.http.tests', + ] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + 'http-test-container', + ports, + aliases=aliases, + allow_existing=True, + cleanup=CleanupMode.YES, + env={ + KRB5_PASSWORD_ENV: generate_password(), + }, + ) + + if not descriptor: + return + + # Read the password from the container environment. + # This allows the tests to work when re-using an existing container. + # The password is marked as sensitive, since it may differ from the one we generated. + krb5_password = descriptor.details.container.env_dict()[KRB5_PASSWORD_ENV] + display.sensitive.add(krb5_password) + + self._set_cloud_config(KRB5_PASSWORD_ENV, krb5_password) + + +class HttptesterEnvironment(CloudEnvironment): + """HTTP Tester environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + return CloudEnvironmentConfig( + env_vars=dict( + HTTPTESTER='1', # backwards compatibility for tests intended to work with or without HTTP Tester + KRB5_PASSWORD=str(self._get_cloud_config(KRB5_PASSWORD_ENV)), + ) + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py new file mode 100644 index 0000000..df0ebb0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/nios.py @@ -0,0 +1,97 @@ +"""NIOS plugin for integration tests.""" +from __future__ import annotations + +import os + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class NiosProvider(CloudProvider): + """Nios plugin. Sets up NIOS mock server for tests.""" + DOCKER_SIMULATOR_NAME = 'nios-simulator' + + # Default image to run the nios simulator. + # + # The simulator must be pinned to a specific version + # to guarantee CI passes with the version used. + # + # It's source source itself resides at: + # https://github.com/ansible/nios-test-container + DOCKER_IMAGE = 'quay.io/ansible/nios-test-container:1.4.0' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.__container_from_env = os.environ.get('ANSIBLE_NIOSSIM_CONTAINER') + """ + Overrides target container, might be used for development. + + Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want + to use other image. Omit/empty otherwise. + """ + + self.image = self.__container_from_env or self.DOCKER_IMAGE + + self.uses_docker = True + + def setup(self) -> None: + """Setup cloud resource before delegation and reg cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_dynamic(self) -> None: + """Spawn a NIOS simulator within docker container.""" + nios_port = 443 + + ports = [ + nios_port, + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('NIOS_HOST', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + raise NotImplementedError() + + +class NiosEnvironment(CloudEnvironment): + """NIOS environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + ansible_vars = dict( + nios_provider=dict( + host=self._get_cloud_config('NIOS_HOST'), + username='admin', + password='infoblox', + ), + ) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py new file mode 100644 index 0000000..d005a3c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/opennebula.py @@ -0,0 +1,60 @@ +"""OpenNebula plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class OpenNebulaCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if not self._use_static_config(): + self._setup_dynamic() + + self.uses_config = True + + def _setup_dynamic(self) -> None: + display.info('No config file provided, will run test from fixtures') + + config = self._read_config_template() + values = dict( + URL="http://localhost/RPC2", + USERNAME='oneadmin', + PASSWORD='onepass', + FIXTURES='true', + REPLAY='true', + ) + config = self._populate_config_template(config, values) + self._write_config(config) + + +class OpenNebulaCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + + ansible_vars.update(dict(parser.items('default'))) + + display.sensitive.add(ansible_vars.get('opennebula_password')) + + return CloudEnvironmentConfig( + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py new file mode 100644 index 0000000..da930c0 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/openshift.py @@ -0,0 +1,114 @@ +"""OpenShift plugin for integration tests.""" +from __future__ import annotations + +import re + +from ....io import ( + read_text_file, +) + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, + wait_for_file, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class OpenShiftCloudProvider(CloudProvider): + """OpenShift cloud provider plugin. Sets up cloud resources before delegation.""" + DOCKER_CONTAINER_NAME = 'openshift-origin' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args, config_extension='.kubeconfig') + + # The image must be pinned to a specific version to guarantee CI passes with the version used. + self.image = 'quay.io/ansible/openshift-origin:v3.9.0' + + self.uses_docker = True + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + if self._use_static_config(): + self._setup_static() + else: + self._setup_dynamic() + + def _setup_static(self) -> None: + """Configure OpenShift tests for use with static configuration.""" + config = read_text_file(self.config_static_path) + + match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE) + + if not match: + display.warning('Could not find OpenShift endpoint in kubeconfig.') + + def _setup_dynamic(self) -> None: + """Create a OpenShift container using docker.""" + port = 8443 + + ports = [ + port, + ] + + cmd = ['start', 'master', '--listen', 'https://0.0.0.0:%d' % port] + + descriptor = run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_CONTAINER_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + cmd=cmd, + ) + + if not descriptor: + return + + if self.args.explain: + config = '# Unknown' + else: + config = self._get_config(self.DOCKER_CONTAINER_NAME, 'https://%s:%s/' % (self.DOCKER_CONTAINER_NAME, port)) + + self._write_config(config) + + def _get_config(self, container_name: str, server: str) -> str: + """Get OpenShift config from container.""" + stdout = wait_for_file(self.args, container_name, '/var/lib/origin/openshift.local.config/master/admin.kubeconfig', sleep=10, tries=30) + + config = stdout + config = re.sub(r'^( *)certificate-authority-data: .*$', r'\1insecure-skip-tls-verify: true', config, flags=re.MULTILINE) + config = re.sub(r'^( *)server: .*$', r'\1server: %s' % server, config, flags=re.MULTILINE) + + return config + + +class OpenShiftCloudEnvironment(CloudEnvironment): + """OpenShift cloud environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + env_vars = dict( + K8S_AUTH_KUBECONFIG=self.config_path, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py new file mode 100644 index 0000000..04c2d89 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/scaleway.py @@ -0,0 +1,56 @@ +"""Scaleway plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class ScalewayCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class ScalewayCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + SCW_API_KEY=parser.get('default', 'key'), + SCW_ORG=parser.get('default', 'org') + ) + + display.sensitive.add(env_vars['SCW_API_KEY']) + + ansible_vars = dict( + scw_org=parser.get('default', 'org'), + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py new file mode 100644 index 0000000..df1651f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vcenter.py @@ -0,0 +1,138 @@ +"""VMware vCenter plugin for integration tests.""" +from __future__ import annotations + +import configparser +import os + +from ....util import ( + ApplicationError, + display, +) + +from ....config import ( + IntegrationConfig, +) + +from ....containers import ( + CleanupMode, + run_support_container, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class VcenterProvider(CloudProvider): + """VMware vcenter/esx plugin. Sets up cloud resources for tests.""" + DOCKER_SIMULATOR_NAME = 'vcenter-simulator' + + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + # The simulator must be pinned to a specific version to guarantee CI passes with the version used. + if os.environ.get('ANSIBLE_VCSIM_CONTAINER'): + self.image = os.environ.get('ANSIBLE_VCSIM_CONTAINER') + else: + self.image = 'quay.io/ansible/vcenter-test-container:1.7.0' + + # VMware tests can be run on govcsim or BYO with a static config file. + # The simulator is the default if no config is provided. + self.vmware_test_platform = os.environ.get('VMWARE_TEST_PLATFORM', 'govcsim') + + if self.vmware_test_platform == 'govcsim': + self.uses_docker = True + self.uses_config = False + elif self.vmware_test_platform == 'static': + self.uses_docker = False + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._set_cloud_config('vmware_test_platform', self.vmware_test_platform) + + if self.vmware_test_platform == 'govcsim': + self._setup_dynamic_simulator() + self.managed = True + elif self.vmware_test_platform == 'static': + self._use_static_config() + self._setup_static() + else: + raise ApplicationError('Unknown vmware_test_platform: %s' % self.vmware_test_platform) + + def _setup_dynamic_simulator(self) -> None: + """Create a vcenter simulator using docker.""" + ports = [ + 443, + 8080, + 8989, + 5000, # control port for flask app in simulator + ] + + run_support_container( + self.args, + self.platform, + self.image, + self.DOCKER_SIMULATOR_NAME, + ports, + allow_existing=True, + cleanup=CleanupMode.YES, + ) + + self._set_cloud_config('vcenter_hostname', self.DOCKER_SIMULATOR_NAME) + + def _setup_static(self) -> None: + if not os.path.exists(self.config_static_path): + raise ApplicationError('Configuration file does not exist: %s' % self.config_static_path) + + +class VcenterEnvironment(CloudEnvironment): + """VMware vcenter/esx environment plugin. Updates integration test environment after delegation.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + try: + # We may be in a container, so we cannot just reach VMWARE_TEST_PLATFORM, + # We do a try/except instead + parser = configparser.ConfigParser() + parser.read(self.config_path) # static + + env_vars = {} + ansible_vars = dict( + resource_prefix=self.resource_prefix, + ) + ansible_vars.update(dict(parser.items('DEFAULT', raw=True))) + except KeyError: # govcsim + env_vars = dict( + VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')), + VCENTER_USERNAME='user', + VCENTER_PASSWORD='pass', + ) + + ansible_vars = dict( + vcsim=str(self._get_cloud_config('vcenter_hostname')), + vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')), + vcenter_username='user', + vcenter_password='pass', + ) + + for key, value in ansible_vars.items(): + if key.endswith('_password'): + display.sensitive.add(value) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + module_defaults={ + 'group/vmware': { + 'hostname': ansible_vars['vcenter_hostname'], + 'username': ansible_vars['vcenter_username'], + 'password': ansible_vars['vcenter_password'], + 'port': ansible_vars.get('vcenter_port', '443'), + 'validate_certs': ansible_vars.get('vmware_validate_certs', 'no'), + }, + }, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py new file mode 100644 index 0000000..1993cda --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/cloud/vultr.py @@ -0,0 +1,55 @@ +"""Vultr plugin for integration tests.""" +from __future__ import annotations + +import configparser + +from ....util import ( + display, +) + +from ....config import ( + IntegrationConfig, +) + +from . import ( + CloudEnvironment, + CloudEnvironmentConfig, + CloudProvider, +) + + +class VultrCloudProvider(CloudProvider): + """Checks if a configuration file has been passed or fixtures are going to be used for testing""" + def __init__(self, args: IntegrationConfig) -> None: + super().__init__(args) + + self.uses_config = True + + def setup(self) -> None: + """Setup the cloud resource before delegation and register a cleanup callback.""" + super().setup() + + self._use_static_config() + + +class VultrCloudEnvironment(CloudEnvironment): + """Updates integration test environment after delegation. Will setup the config file as parameter.""" + def get_environment_config(self) -> CloudEnvironmentConfig: + """Return environment configuration for use in the test environment after delegation.""" + parser = configparser.ConfigParser() + parser.read(self.config_path) + + env_vars = dict( + VULTR_API_KEY=parser.get('default', 'key'), + ) + + display.sensitive.add(env_vars['VULTR_API_KEY']) + + ansible_vars = dict( + vultr_resource_prefix=self.resource_prefix, + ) + + return CloudEnvironmentConfig( + env_vars=env_vars, + ansible_vars=ansible_vars, + ) diff --git a/test/lib/ansible_test/_internal/commands/integration/coverage.py b/test/lib/ansible_test/_internal/commands/integration/coverage.py new file mode 100644 index 0000000..5a486e9 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/coverage.py @@ -0,0 +1,417 @@ +"""Code coverage support for integration tests.""" +from __future__ import annotations + +import abc +import os +import shutil +import tempfile +import typing as t +import zipfile + +from ...io import ( + write_text_file, +) + +from ...ansible_util import ( + run_playbook, +) + +from ...config import ( + IntegrationConfig, +) + +from ...util import ( + COVERAGE_CONFIG_NAME, + MODE_DIRECTORY, + MODE_DIRECTORY_WRITE, + MODE_FILE, + SubprocessError, + cache, + display, + generate_name, + get_generic_type, + get_type_map, + remove_tree, + sanitize_host_name, + verified_chmod, +) + +from ...util_common import ( + ResultType, +) + +from ...coverage_util import ( + generate_coverage_config, + get_coverage_platform, +) + +from ...host_configs import ( + HostConfig, + PosixConfig, + WindowsConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ...data import ( + data_context, +) + +from ...host_profiles import ( + ControllerProfile, + HostProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + HostState, +) + +from ...connections import ( + LocalConnection, +) + +from ...inventory import ( + create_windows_inventory, + create_posix_inventory, +) + +THostConfig = t.TypeVar('THostConfig', bound=HostConfig) + + +class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta): + """Base class for configuring hosts for integration test code coverage.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + self.profiles = self.get_profiles() + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profile_type = get_generic_type(type(self), HostConfig) + profiles = [profile for profile in self.host_state.target_profiles if isinstance(profile.config, profile_type)] + + return profiles + + @property + @abc.abstractmethod + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + + @abc.abstractmethod + def setup(self) -> None: + """Perform setup for code coverage.""" + + @abc.abstractmethod + def teardown(self) -> None: + """Perform teardown for code coverage.""" + + @abc.abstractmethod + def create_inventory(self) -> None: + """Create inventory, if needed.""" + + @abc.abstractmethod + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + def run_playbook(self, playbook: str, variables: dict[str, str]) -> None: + """Run the specified playbook using the current inventory.""" + self.create_inventory() + run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables) + + +class PosixCoverageHandler(CoverageHandler[PosixConfig]): + """Configure integration test code coverage for POSIX hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all POSIX hosts that will be created world writeable. + self.common_temp_path = f'/tmp/ansible-test-{generate_name()}' + + def get_profiles(self) -> list[HostProfile]: + """Return a list of profiles relevant for this handler.""" + profiles = super().get_profiles() + profiles = [profile for profile in profiles if not isinstance(profile, ControllerProfile) or + profile.python.path != self.host_state.controller_profile.python.path] + + return profiles + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return True + + @property + def target_profile(self) -> t.Optional[PosixProfile]: + """The POSIX target profile, if it uses a different Python interpreter than the controller, otherwise None.""" + return t.cast(PosixProfile, self.profiles[0]) if self.profiles else None + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.setup_controller() + self.setup_target() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + self.teardown_controller() + self.teardown_target() + + def setup_controller(self) -> None: + """Perform setup for code coverage on the controller.""" + coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + + coverage_config = generate_coverage_config(self.args) + + write_text_file(coverage_config_path, coverage_config, create_directories=True) + + verified_chmod(coverage_config_path, MODE_FILE) + os.mkdir(coverage_output_path) + verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE) + + def setup_target(self) -> None: + """Perform setup for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + self.run_playbook('posix_coverage_setup.yml', self.get_playbook_variables()) + + def teardown_controller(self) -> None: + """Perform teardown for code coverage on the controller.""" + coverage_temp_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) + platform = get_coverage_platform(self.args.controller) + + for filename in os.listdir(coverage_temp_path): + shutil.copyfile(os.path.join(coverage_temp_path, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + remove_tree(self.common_temp_path) + + def teardown_target(self) -> None: + """Perform teardown for code coverage on the target.""" + if not self.target_profile: + return + + if isinstance(self.target_profile, ControllerProfile): + return + + profile = t.cast(SshTargetHostProfile, self.target_profile) + platform = get_coverage_platform(profile.config) + con = profile.get_controller_target_connections()[0] + + with tempfile.NamedTemporaryFile(prefix='ansible-test-coverage-', suffix='.tgz') as coverage_tgz: + try: + con.create_archive(chdir=self.common_temp_path, name=ResultType.COVERAGE.name, dst=coverage_tgz) + except SubprocessError as ex: + display.warning(f'Failed to download coverage results: {ex}') + else: + coverage_tgz.seek(0) + + with tempfile.TemporaryDirectory() as temp_dir: + local_con = LocalConnection(self.args) + local_con.extract_archive(chdir=temp_dir, src=coverage_tgz) + + base_dir = os.path.join(temp_dir, ResultType.COVERAGE.name) + + for filename in os.listdir(base_dir): + shutil.copyfile(os.path.join(base_dir, filename), os.path.join(ResultType.COVERAGE.path, update_coverage_filename(filename, platform))) + + self.run_playbook('posix_coverage_teardown.yml', self.get_playbook_variables()) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Enable code coverage collection on Ansible modules (both local and remote). + # Used by the AnsiballZ wrapper generator in lib/ansible/executor/module_common.py to support code coverage. + config_file = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The generated AnsiballZ wrapper is responsible for adding '=python-{X.Y}=coverage.{hostname}.{pid}.{id}' + coverage_file = os.path.join(self.common_temp_path, ResultType.COVERAGE.name, '='.join((self.args.command, target_name, 'platform'))) + + if self.args.coverage_check: + # cause the 'coverage' module to be found, but not imported or enabled + coverage_file = '' + + variables = dict( + _ANSIBLE_COVERAGE_CONFIG=config_file, + _ANSIBLE_COVERAGE_OUTPUT=coverage_file, + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_posix_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of POSIX coverage.""" + return dict( + common_temp_dir=self.common_temp_path, + coverage_config=generate_coverage_config(self.args), + coverage_config_path=os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME), + coverage_output_path=os.path.join(self.common_temp_path, ResultType.COVERAGE.name), + mode_directory=f'{MODE_DIRECTORY:04o}', + mode_directory_write=f'{MODE_DIRECTORY_WRITE:04o}', + mode_file=f'{MODE_FILE:04o}', + ) + + +class WindowsCoverageHandler(CoverageHandler[WindowsConfig]): + """Configure integration test code coverage for Windows hosts.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + super().__init__(args, host_state, inventory_path) + + # Common temporary directory used on all Windows hosts that will be created writable by everyone. + self.remote_temp_path = f'C:\\ansible_test_coverage_{generate_name()}' + + @property + def is_active(self) -> bool: + """True if the handler should be used, otherwise False.""" + return bool(self.profiles) and not self.args.coverage_check + + def setup(self) -> None: + """Perform setup for code coverage.""" + self.run_playbook('windows_coverage_setup.yml', self.get_playbook_variables()) + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + with tempfile.TemporaryDirectory() as local_temp_path: + variables = self.get_playbook_variables() + variables.update( + local_temp_path=local_temp_path, + ) + + self.run_playbook('windows_coverage_teardown.yml', variables) + + for filename in os.listdir(local_temp_path): + if all(isinstance(profile.config, WindowsRemoteConfig) for profile in self.profiles): + prefix = 'remote' + elif all(isinstance(profile.config, WindowsInventoryConfig) for profile in self.profiles): + prefix = 'inventory' + else: + raise NotImplementedError() + + platform = f'{prefix}-{sanitize_host_name(os.path.splitext(filename)[0])}' + + with zipfile.ZipFile(os.path.join(local_temp_path, filename)) as coverage_zip: + for item in coverage_zip.infolist(): + if item.is_dir(): + raise Exception(f'Unexpected directory in zip file: {item.filename}') + + item.filename = update_coverage_filename(item.filename, platform) + + coverage_zip.extract(item, ResultType.COVERAGE.path) + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + + # Include the command, target and platform marker so the remote host can create a filename with that info. + # The remote is responsible for adding '={language-version}=coverage.{hostname}.{pid}.{id}' + coverage_name = '='.join((self.args.command, target_name, 'platform')) + + variables = dict( + _ANSIBLE_COVERAGE_REMOTE_OUTPUT=os.path.join(self.remote_temp_path, coverage_name), + _ANSIBLE_COVERAGE_REMOTE_PATH_FILTER=os.path.join(data_context().content.root, '*'), + ) + + return variables + + def create_inventory(self) -> None: + """Create inventory.""" + create_windows_inventory(self.args, self.inventory_path, self.host_state.target_profiles) + + def get_playbook_variables(self) -> dict[str, str]: + """Return a dictionary of variables for setup and teardown of Windows coverage.""" + return dict( + remote_temp_path=self.remote_temp_path, + ) + + +class CoverageManager: + """Manager for code coverage configuration and state.""" + def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None: + self.args = args + self.host_state = host_state + self.inventory_path = inventory_path + + if self.args.coverage: + handler_types = set(get_handler_type(type(profile.config)) for profile in host_state.profiles) + handler_types.discard(None) + else: + handler_types = set() + + handlers = [handler_type(args=args, host_state=host_state, inventory_path=inventory_path) for handler_type in handler_types] + + self.handlers = [handler for handler in handlers if handler.is_active] + + def setup(self) -> None: + """Perform setup for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.setup() + + def teardown(self) -> None: + """Perform teardown for code coverage.""" + if not self.args.coverage: + return + + for handler in self.handlers: + handler.teardown() + + def get_environment(self, target_name: str, aliases: tuple[str, ...]) -> dict[str, str]: + """Return a dictionary of environment variables for running tests with code coverage.""" + if not self.args.coverage or 'non_local/' in aliases: + return {} + + env = {} + + for handler in self.handlers: + env.update(handler.get_environment(target_name, aliases)) + + return env + + +@cache +def get_config_handler_type_map() -> dict[t.Type[HostConfig], t.Type[CoverageHandler]]: + """Create and return a mapping of HostConfig types to CoverageHandler types.""" + return get_type_map(CoverageHandler, HostConfig) + + +def get_handler_type(config_type: t.Type[HostConfig]) -> t.Optional[t.Type[CoverageHandler]]: + """Return the coverage handler type associated with the given host config type if found, otherwise return None.""" + queue = [config_type] + type_map = get_config_handler_type_map() + + while queue: + config_type = queue.pop(0) + handler_type = type_map.get(config_type) + + if handler_type: + return handler_type + + queue.extend(config_type.__bases__) + + return None + + +def update_coverage_filename(original_filename: str, platform: str) -> str: + """Validate the given filename and insert the specified platform, then return the result.""" + parts = original_filename.split('=') + + if original_filename != os.path.basename(original_filename) or len(parts) != 5 or parts[2] != 'platform': + raise Exception(f'Unexpected coverage filename: {original_filename}') + + parts[2] = platform + + updated_filename = '='.join(parts) + + display.info(f'Coverage file for platform "{platform}": {original_filename} -> {updated_filename}', verbosity=3) + + return updated_filename diff --git a/test/lib/ansible_test/_internal/commands/integration/filters.py b/test/lib/ansible_test/_internal/commands/integration/filters.py new file mode 100644 index 0000000..be03d7f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/filters.py @@ -0,0 +1,279 @@ +"""Logic for filtering out integration test targets which are unsupported for the currently provided arguments and available hosts.""" +from __future__ import annotations + +import abc +import typing as t + +from ...config import ( + IntegrationConfig, +) + +from ...util import ( + cache, + detect_architecture, + display, + get_type_map, +) + +from ...target import ( + IntegrationTarget, +) + +from ...host_configs import ( + ControllerConfig, + DockerConfig, + FallbackReason, + HostConfig, + NetworkInventoryConfig, + NetworkRemoteConfig, + OriginConfig, + PosixConfig, + PosixRemoteConfig, + PosixSshConfig, + RemoteConfig, + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from ...host_profiles import ( + HostProfile, +) + +THostConfig = t.TypeVar('THostConfig', bound=HostConfig) +TPosixConfig = t.TypeVar('TPosixConfig', bound=PosixConfig) +TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig) +THostProfile = t.TypeVar('THostProfile', bound=HostProfile) + + +class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta): + """Base class for target filters.""" + def __init__(self, args: IntegrationConfig, configs: list[THostConfig], controller: bool) -> None: + self.args = args + self.configs = configs + self.controller = controller + self.host_type = 'controller' if controller else 'target' + + # values which are not host specific + self.include_targets = args.include + self.allow_root = args.allow_root + self.allow_destructive = args.allow_destructive + + @property + def config(self) -> THostConfig: + """The configuration to filter. Only valid when there is a single config.""" + if len(self.configs) != 1: + raise Exception() + + return self.configs[0] + + def skip( + self, + skip: str, + reason: str, + targets: list[IntegrationTarget], + exclude: set[str], + override: t.Optional[list[str]] = None, + ) -> None: + """Apply the specified skip rule to the given targets by updating the provided exclude list.""" + if skip.startswith('skip/'): + skipped = [target.name for target in targets if skip in target.skips and (not override or target.name not in override)] + else: + skipped = [target.name for target in targets if f'{skip}/' in target.aliases and (not override or target.name not in override)] + + self.apply_skip(f'"{skip}"', reason, skipped, exclude) + + def apply_skip(self, marked: str, reason: str, skipped: list[str], exclude: set[str]) -> None: + """Apply the provided skips to the given exclude list.""" + if not skipped: + return + + exclude.update(skipped) + display.warning(f'Excluding {self.host_type} tests marked {marked} {reason}: {", ".join(skipped)}') + + def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Filter the list of profiles, returning only those which are not skipped for the given target.""" + del target + return profiles + + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + if self.controller and self.args.host_settings.controller_fallback and targets: + affected_targets = [target.name for target in targets] + reason = self.args.host_settings.controller_fallback.reason + + if reason == FallbackReason.ENVIRONMENT: + exclude.update(affected_targets) + display.warning(f'Excluding {self.host_type} tests since a fallback controller is in use: {", ".join(affected_targets)}') + elif reason == FallbackReason.PYTHON: + display.warning(f'Some {self.host_type} tests may be redundant since a fallback python is in use: {", ".join(affected_targets)}') + + if not self.allow_destructive and not self.config.is_managed: + override_destructive = set(target for target in self.include_targets if target.startswith('destructive/')) + override = [target.name for target in targets if override_destructive & set(target.aliases)] + + self.skip('destructive', 'which require --allow-destructive or prefixing with "destructive/" to run on unmanaged hosts', targets, exclude, override) + + if not self.args.allow_disabled: + override_disabled = set(target for target in self.args.include if target.startswith('disabled/')) + override = [target.name for target in targets if override_disabled & set(target.aliases)] + + self.skip('disabled', 'which require --allow-disabled or prefixing with "disabled/"', targets, exclude, override) + + if not self.args.allow_unsupported: + override_unsupported = set(target for target in self.args.include if target.startswith('unsupported/')) + override = [target.name for target in targets if override_unsupported & set(target.aliases)] + + self.skip('unsupported', 'which require --allow-unsupported or prefixing with "unsupported/"', targets, exclude, override) + + if not self.args.allow_unstable: + override_unstable = set(target for target in self.args.include if target.startswith('unstable/')) + + if self.args.allow_unstable_changed: + override_unstable |= set(self.args.metadata.change_description.focused_targets or []) + + override = [target.name for target in targets if override_unstable & set(target.aliases)] + + self.skip('unstable', 'which require --allow-unstable or prefixing with "unstable/"', targets, exclude, override) + + +class PosixTargetFilter(TargetFilter[TPosixConfig]): + """Target filter for POSIX hosts.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + if not self.allow_root and not self.config.have_root: + self.skip('needs/root', 'which require --allow-root or running as root', targets, exclude) + + self.skip(f'skip/python{self.config.python.version}', f'which are not supported by Python {self.config.python.version}', targets, exclude) + self.skip(f'skip/python{self.config.python.major_version}', f'which are not supported by Python {self.config.python.major_version}', targets, exclude) + + +class DockerTargetFilter(PosixTargetFilter[DockerConfig]): + """Target filter for docker hosts.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + self.skip('skip/docker', 'which cannot run under docker', targets, exclude) + + if not self.config.privileged: + self.skip('needs/privileged', 'which require --docker-privileged to run under docker', targets, exclude) + + +class PosixSshTargetFilter(PosixTargetFilter[PosixSshConfig]): + """Target filter for POSIX SSH hosts.""" + + +class RemoteTargetFilter(TargetFilter[TRemoteConfig]): + """Target filter for remote Ansible Core CI managed hosts.""" + def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]: + """Filter the list of profiles, returning only those which are not skipped for the given target.""" + profiles = super().filter_profiles(profiles, target) + + skipped_profiles = [profile for profile in profiles if any(skip in target.skips for skip in get_remote_skip_aliases(profile.config))] + + if skipped_profiles: + configs: list[TRemoteConfig] = [profile.config for profile in skipped_profiles] + display.warning(f'Excluding skipped hosts from inventory: {", ".join(config.name for config in configs)}') + + profiles = [profile for profile in profiles if profile not in skipped_profiles] + + return profiles + + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + if len(self.configs) > 1: + host_skips = {host.name: get_remote_skip_aliases(host) for host in self.configs} + + # Skip only targets which skip all hosts. + # Targets that skip only some hosts will be handled during inventory generation. + skipped = [target.name for target in targets if all(any(skip in target.skips for skip in skips) for skips in host_skips.values())] + + if skipped: + exclude.update(skipped) + display.warning(f'Excluding tests which do not support {", ".join(host_skips.keys())}: {", ".join(skipped)}') + else: + skips = get_remote_skip_aliases(self.config) + + for skip, reason in skips.items(): + self.skip(skip, reason, targets, exclude) + + +class PosixRemoteTargetFilter(PosixTargetFilter[PosixRemoteConfig], RemoteTargetFilter[PosixRemoteConfig]): + """Target filter for POSIX remote hosts.""" + + +class WindowsRemoteTargetFilter(RemoteTargetFilter[WindowsRemoteConfig]): + """Target filter for remote Windows hosts.""" + + +class WindowsInventoryTargetFilter(TargetFilter[WindowsInventoryConfig]): + """Target filter for Windows inventory.""" + + +class NetworkRemoteTargetFilter(RemoteTargetFilter[NetworkRemoteConfig]): + """Target filter for remote network hosts.""" + + +class NetworkInventoryTargetFilter(TargetFilter[NetworkInventoryConfig]): + """Target filter for network inventory.""" + + +class OriginTargetFilter(PosixTargetFilter[OriginConfig]): + """Target filter for localhost.""" + def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None: + """Filter the list of targets, adding any which this host profile cannot support to the provided exclude list.""" + super().filter_targets(targets, exclude) + + arch = detect_architecture(self.config.python.path) + + if arch: + self.skip(f'skip/{arch}', f'which are not supported by {arch}', targets, exclude) + + +@cache +def get_host_target_type_map() -> dict[t.Type[HostConfig], t.Type[TargetFilter]]: + """Create and return a mapping of HostConfig types to TargetFilter types.""" + return get_type_map(TargetFilter, HostConfig) + + +def get_target_filter(args: IntegrationConfig, configs: list[HostConfig], controller: bool) -> TargetFilter: + """Return an integration test target filter instance for the provided host configurations.""" + target_type = type(configs[0]) + + if issubclass(target_type, ControllerConfig): + target_type = type(args.controller) + configs = [args.controller] + + filter_type = get_host_target_type_map()[target_type] + filter_instance = filter_type(args, configs, controller) + + return filter_instance + + +def get_remote_skip_aliases(config: RemoteConfig) -> dict[str, str]: + """Return a dictionary of skip aliases and the reason why they apply.""" + return get_platform_skip_aliases(config.platform, config.version, config.arch) + + +def get_platform_skip_aliases(platform: str, version: str, arch: t.Optional[str]) -> dict[str, str]: + """Return a dictionary of skip aliases and the reason why they apply.""" + skips = { + f'skip/{platform}': platform, + f'skip/{platform}/{version}': f'{platform} {version}', + f'skip/{platform}{version}': f'{platform} {version}', # legacy syntax, use above format + } + + if arch: + skips.update({ + f'skip/{arch}': arch, + f'skip/{arch}/{platform}': f'{platform} on {arch}', + f'skip/{arch}/{platform}/{version}': f'{platform} {version} on {arch}', + }) + + skips = {alias: f'which are not supported by {description}' for alias, description in skips.items()} + + return skips diff --git a/test/lib/ansible_test/_internal/commands/integration/network.py b/test/lib/ansible_test/_internal/commands/integration/network.py new file mode 100644 index 0000000..d28416c --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/network.py @@ -0,0 +1,77 @@ +"""Network integration testing.""" +from __future__ import annotations + +import os + +from ...util import ( + ApplicationError, + ANSIBLE_TEST_CONFIG_ROOT, +) + +from ...util_common import ( + handle_layout_messages, +) + +from ...target import ( + walk_network_integration_targets, +) + +from ...config import ( + NetworkIntegrationConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_absolute_path, + get_inventory_relative_path, + check_inventory, + delegate_inventory, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + NetworkInventoryConfig, + NetworkRemoteConfig, +) + + +def command_network_integration(args: NetworkIntegrationConfig) -> None: + """Entry point for the `network-integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template' + + if issubclass(args.target_type, NetworkInventoryConfig): + target = args.only_target(NetworkInventoryConfig) + inventory_path = get_inventory_absolute_path(args, target) + + if args.delegate or not target.path: + target.path = inventory_relative_path + else: + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + if args.no_temp_workdir: + # temporary solution to keep DCI tests working + inventory_exists = os.path.exists(inventory_path) + else: + inventory_exists = os.path.isfile(inventory_path) + + if not args.explain and not issubclass(args.target_type, NetworkRemoteConfig) and not inventory_exists: + raise ApplicationError( + 'Inventory not found: %s\n' + 'Use --inventory to specify the inventory path.\n' + 'Use --platform to provision resources and generate an inventory file.\n' + 'See also inventory template: %s' % (inventory_path, template_path) + ) + + check_inventory(args, inventory_path) + delegate_inventory(args, inventory_path) + + all_targets = tuple(walk_network_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path) diff --git a/test/lib/ansible_test/_internal/commands/integration/posix.py b/test/lib/ansible_test/_internal/commands/integration/posix.py new file mode 100644 index 0000000..d4c50d3 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/posix.py @@ -0,0 +1,48 @@ +"""POSIX integration testing.""" +from __future__ import annotations + +import os + +from ...util_common import ( + handle_layout_messages, +) + +from ...containers import ( + create_container_hooks, + local_ssh, + root_ssh, +) + +from ...target import ( + walk_posix_integration_targets, +) + +from ...config import ( + PosixIntegrationConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_relative_path, +) + +from ...data import ( + data_context, +) + + +def command_posix_integration(args: PosixIntegrationConfig) -> None: + """Entry point for the `integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + all_targets = tuple(walk_posix_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + control_connections = [local_ssh(args, host_state.controller_profile.python)] + managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()] + pre_target, post_target = create_container_hooks(args, control_connections, managed_connections) + + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target) diff --git a/test/lib/ansible_test/_internal/commands/integration/windows.py b/test/lib/ansible_test/_internal/commands/integration/windows.py new file mode 100644 index 0000000..aa201c4 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/integration/windows.py @@ -0,0 +1,81 @@ +"""Windows integration testing.""" +from __future__ import annotations + +import os + +from ...util import ( + ApplicationError, + ANSIBLE_TEST_CONFIG_ROOT, +) + +from ...util_common import ( + handle_layout_messages, +) + +from ...containers import ( + create_container_hooks, + local_ssh, + root_ssh, +) + +from ...target import ( + walk_windows_integration_targets, +) + +from ...config import ( + WindowsIntegrationConfig, +) + +from ...host_configs import ( + WindowsInventoryConfig, + WindowsRemoteConfig, +) + +from . import ( + command_integration_filter, + command_integration_filtered, + get_inventory_absolute_path, + get_inventory_relative_path, + check_inventory, + delegate_inventory, +) + +from ...data import ( + data_context, +) + + +def command_windows_integration(args: WindowsIntegrationConfig) -> None: + """Entry point for the `windows-integration` command.""" + handle_layout_messages(data_context().content.integration_messages) + + inventory_relative_path = get_inventory_relative_path(args) + template_path = os.path.join(ANSIBLE_TEST_CONFIG_ROOT, os.path.basename(inventory_relative_path)) + '.template' + + if issubclass(args.target_type, WindowsInventoryConfig): + target = args.only_target(WindowsInventoryConfig) + inventory_path = get_inventory_absolute_path(args, target) + + if args.delegate or not target.path: + target.path = inventory_relative_path + else: + inventory_path = os.path.join(data_context().content.root, inventory_relative_path) + + if not args.explain and not issubclass(args.target_type, WindowsRemoteConfig) and not os.path.isfile(inventory_path): + raise ApplicationError( + 'Inventory not found: %s\n' + 'Use --inventory to specify the inventory path.\n' + 'Use --windows to provision resources and generate an inventory file.\n' + 'See also inventory template: %s' % (inventory_path, template_path) + ) + + check_inventory(args, inventory_path) + delegate_inventory(args, inventory_path) + + all_targets = tuple(walk_windows_integration_targets(include_hidden=True)) + host_state, internal_targets = command_integration_filter(args, all_targets) + control_connections = [local_ssh(args, host_state.controller_profile.python)] + managed_connections = [root_ssh(ssh) for ssh in host_state.get_controller_target_connections()] + pre_target, post_target = create_container_hooks(args, control_connections, managed_connections) + + command_integration_filtered(args, host_state, internal_targets, all_targets, inventory_path, pre_target=pre_target, post_target=post_target) diff --git a/test/lib/ansible_test/_internal/commands/sanity/__init__.py b/test/lib/ansible_test/_internal/commands/sanity/__init__.py new file mode 100644 index 0000000..00b3031 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/__init__.py @@ -0,0 +1,1173 @@ +"""Execute Ansible sanity tests.""" +from __future__ import annotations + +import abc +import glob +import hashlib +import json +import os +import pathlib +import re +import collections +import collections.abc as c +import typing as t + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_ONLY_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...encoding import ( + to_bytes, +) + +from ...io import ( + read_json_file, + write_json_file, + write_text_file, +) + +from ...util import ( + ApplicationError, + SubprocessError, + display, + import_plugins, + load_plugins, + parse_to_list_of_dict, + ANSIBLE_TEST_CONTROLLER_ROOT, + ANSIBLE_TEST_TARGET_ROOT, + is_binary_file, + read_lines_without_comments, + is_subdir, + paths_to_dirs, + get_ansible_version, + str_to_version, + cache, + remove_tree, +) + +from ...util_common import ( + intercept_python, + handle_layout_messages, + yamlcheck, + create_result_directories, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...target import ( + walk_internal_targets, + walk_sanity_targets, + TestTarget, +) + +from ...executor import ( + get_changes_filter, + AllTargetsSkipped, + Delegate, +) + +from ...python_requirements import ( + PipInstall, + collect_requirements, + run_pip, +) + +from ...config import ( + SanityConfig, +) + +from ...test import ( + TestSuccess, + TestFailure, + TestSkipped, + TestMessage, + TestResult, + calculate_best_confidence, +) + +from ...data import ( + data_context, +) + +from ...content_config import ( + get_content_config, +) + +from ...host_configs import ( + DockerConfig, + PosixConfig, + PythonConfig, + VirtualPythonConfig, +) + +from ...host_profiles import ( + PosixProfile, +) + +from ...provisioning import ( + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...venv import ( + create_virtual_environment, +) + +COMMAND = 'sanity' +SANITY_ROOT = os.path.join(ANSIBLE_TEST_CONTROLLER_ROOT, 'sanity') +TARGET_SANITY_ROOT = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'sanity') + +# NOTE: must match ansible.constants.DOCUMENTABLE_PLUGINS, but with 'module' replaced by 'modules'! +DOCUMENTABLE_PLUGINS = ( + 'become', 'cache', 'callback', 'cliconf', 'connection', 'httpapi', 'inventory', 'lookup', 'netconf', 'modules', 'shell', 'strategy', 'vars' +) + +created_venvs: list[str] = [] + + +def command_sanity(args: SanityConfig) -> None: + """Run sanity tests.""" + create_result_directories(args) + + target_configs = t.cast(list[PosixConfig], args.targets) + target_versions: dict[str, PosixConfig] = {target.python.version: target for target in target_configs} + + handle_layout_messages(data_context().content.sanity_messages) + + changes = get_changes_filter(args) + require = args.require + changes + targets = SanityTargets.create(args.include, args.exclude, require) + + if not targets.include: + raise AllTargetsSkipped() + + tests = list(sanity_get_tests()) + + if args.test: + disabled = [] + tests = [target for target in tests if target.name in args.test] + else: + disabled = [target.name for target in tests if not target.enabled and not args.allow_disabled] + tests = [target for target in tests if target.enabled or args.allow_disabled] + + if args.skip_test: + tests = [target for target in tests if target.name not in args.skip_test] + + targets_use_pypi = any(isinstance(test, SanityMultipleVersion) and test.needs_pypi for test in tests) and not args.list_tests + host_state = prepare_profiles(args, targets_use_pypi=targets_use_pypi) # sanity + + get_content_config(args) # make sure content config has been parsed prior to delegation + + if args.delegate: + raise Delegate(host_state=host_state, require=changes, exclude=args.exclude) + + configure_pypi_proxy(args, host_state.controller_profile) # sanity + + if disabled: + display.warning('Skipping tests disabled by default without --allow-disabled: %s' % ', '.join(sorted(disabled))) + + target_profiles: dict[str, PosixProfile] = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)} + + total = 0 + failed = [] + + result: t.Optional[TestResult] + + for test in tests: + if args.list_tests: + print(test.name) # display goes to stderr, this should be on stdout + continue + + for version in SUPPORTED_PYTHON_VERSIONS: + options = '' + + if isinstance(test, SanityMultipleVersion): + if version not in target_versions and version not in args.host_settings.skipped_python_versions: + continue # version was not requested, skip it silently + else: + if version != args.controller_python.version: + continue # only multi-version sanity tests use target versions, the rest use the controller version + + if test.supported_python_versions and version not in test.supported_python_versions: + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} because it is unsupported.' \ + f' Supported Python versions: {", ".join(test.supported_python_versions)}' + else: + if isinstance(test, SanityCodeSmellTest): + settings = test.load_processor(args) + elif isinstance(test, SanityMultipleVersion): + settings = test.load_processor(args, version) + elif isinstance(test, SanitySingleVersion): + settings = test.load_processor(args) + elif isinstance(test, SanityVersionNeutral): + settings = test.load_processor(args) + else: + raise Exception('Unsupported test type: %s' % type(test)) + + all_targets = list(targets.targets) + + if test.all_targets: + usable_targets = list(targets.targets) + elif test.no_targets: + usable_targets = [] + else: + usable_targets = list(targets.include) + + all_targets = SanityTargets.filter_and_inject_targets(test, all_targets) + usable_targets = SanityTargets.filter_and_inject_targets(test, usable_targets) + + usable_targets = sorted(test.filter_targets_by_version(args, list(usable_targets), version)) + usable_targets = settings.filter_skipped_targets(usable_targets) + sanity_targets = SanityTargets(tuple(all_targets), tuple(usable_targets)) + + test_needed = bool(usable_targets or test.no_targets or args.prime_venvs) + result = None + + if test_needed and version in args.host_settings.skipped_python_versions: + # Deferred checking of Python availability. Done here since it is now known to be required for running the test. + # Earlier checking could cause a spurious warning to be generated for a collection which does not support the Python version. + # If the user specified a Python version, an error will be generated before reaching this point when the Python interpreter is not found. + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} because it could not be found.' + + if not result: + if isinstance(test, SanityMultipleVersion): + display.info(f'Running sanity test "{test.name}" on Python {version}') + else: + display.info(f'Running sanity test "{test.name}"') + + if test_needed and not result: + if isinstance(test, SanityMultipleVersion): + # multi-version sanity tests handle their own requirements (if any) and use the target python + test_profile = target_profiles[version] + result = test.test(args, sanity_targets, test_profile.python) + options = ' --python %s' % version + elif isinstance(test, SanitySingleVersion): + # single version sanity tests use the controller python + test_profile = host_state.controller_profile + virtualenv_python = create_sanity_virtualenv(args, test_profile.python, test.name) + + if virtualenv_python: + virtualenv_yaml = check_sanity_virtualenv_yaml(virtualenv_python) + + if test.require_libyaml and not virtualenv_yaml: + result = SanitySkipped(test.name) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} due to missing libyaml support in PyYAML.' + else: + if virtualenv_yaml is False: + display.warning(f'Sanity test "{test.name}" on Python {version} may be slow due to missing libyaml support in PyYAML.') + + if args.prime_venvs: + result = SanitySkipped(test.name) + else: + result = test.test(args, sanity_targets, virtualenv_python) + else: + result = SanitySkipped(test.name, version) + result.reason = f'Skipping sanity test "{test.name}" on Python {version} due to missing virtual environment support.' + elif isinstance(test, SanityVersionNeutral): + if args.prime_venvs: + result = SanitySkipped(test.name) + else: + # version neutral sanity tests handle their own requirements (if any) + result = test.test(args, sanity_targets) + else: + raise Exception('Unsupported test type: %s' % type(test)) + elif result: + pass + else: + result = SanitySkipped(test.name, version) + + result.write(args) + + total += 1 + + if isinstance(result, SanityFailure): + failed.append(result.test + options) + + controller = args.controller + + if created_venvs and isinstance(controller, DockerConfig) and controller.name == 'default' and not args.prime_venvs: + names = ', '.join(created_venvs) + display.warning(f'There following sanity test virtual environments are out-of-date in the "default" container: {names}') + + if failed: + message = 'The %d sanity test(s) listed below (out of %d) failed. See error output above for details.\n%s' % ( + len(failed), total, '\n'.join(failed)) + + if args.failure_ok: + display.error(message) + else: + raise ApplicationError(message) + + +@cache +def collect_code_smell_tests() -> tuple[SanityTest, ...]: + """Return a tuple of available code smell sanity tests.""" + paths = glob.glob(os.path.join(SANITY_ROOT, 'code-smell', '*.py')) + + if data_context().content.is_ansible: + # include Ansible specific code-smell tests which are not configured to be skipped + ansible_code_smell_root = os.path.join(data_context().content.root, 'test', 'sanity', 'code-smell') + skip_tests = read_lines_without_comments(os.path.join(ansible_code_smell_root, 'skip.txt'), remove_blank_lines=True, optional=True) + paths.extend(path for path in glob.glob(os.path.join(ansible_code_smell_root, '*.py')) if os.path.basename(path) not in skip_tests) + + tests = tuple(SanityCodeSmellTest(p) for p in paths) + + return tests + + +class SanityIgnoreParser: + """Parser for the consolidated sanity test ignore file.""" + NO_CODE = '_' + + def __init__(self, args: SanityConfig) -> None: + if data_context().content.collection: + ansible_version = '%s.%s' % tuple(get_ansible_version().split('.')[:2]) + + ansible_label = 'Ansible %s' % ansible_version + file_name = 'ignore-%s.txt' % ansible_version + else: + ansible_label = 'Ansible' + file_name = 'ignore.txt' + + self.args = args + self.relative_path = os.path.join(data_context().content.sanity_path, file_name) + self.path = os.path.join(data_context().content.root, self.relative_path) + self.ignores: dict[str, dict[str, dict[str, int]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) + self.skips: dict[str, dict[str, int]] = collections.defaultdict(lambda: collections.defaultdict(int)) + self.parse_errors: list[tuple[int, int, str]] = [] + self.file_not_found_errors: list[tuple[int, str]] = [] + + lines = read_lines_without_comments(self.path, optional=True) + targets = SanityTargets.get_targets() + paths = set(target.path for target in targets) + tests_by_name: dict[str, SanityTest] = {} + versioned_test_names: set[str] = set() + unversioned_test_names: dict[str, str] = {} + directories = paths_to_dirs(list(paths)) + paths_by_test: dict[str, set[str]] = {} + + display.info('Read %d sanity test ignore line(s) for %s from: %s' % (len(lines), ansible_label, self.relative_path), verbosity=1) + + for test in sanity_get_tests(): + test_targets = SanityTargets.filter_and_inject_targets(test, targets) + + if isinstance(test, SanityMultipleVersion): + versioned_test_names.add(test.name) + + for python_version in test.supported_python_versions: + test_name = '%s-%s' % (test.name, python_version) + + paths_by_test[test_name] = set(target.path for target in test.filter_targets_by_version(args, test_targets, python_version)) + tests_by_name[test_name] = test + else: + unversioned_test_names.update(dict(('%s-%s' % (test.name, python_version), test.name) for python_version in SUPPORTED_PYTHON_VERSIONS)) + + paths_by_test[test.name] = set(target.path for target in test.filter_targets_by_version(args, test_targets, '')) + tests_by_name[test.name] = test + + for line_no, line in enumerate(lines, start=1): + if not line: + self.parse_errors.append((line_no, 1, "Line cannot be empty or contain only a comment")) + continue + + parts = line.split(' ') + path = parts[0] + codes = parts[1:] + + if not path: + self.parse_errors.append((line_no, 1, "Line cannot start with a space")) + continue + + if path.endswith(os.path.sep): + if path not in directories: + self.file_not_found_errors.append((line_no, path)) + continue + else: + if path not in paths: + self.file_not_found_errors.append((line_no, path)) + continue + + if not codes: + self.parse_errors.append((line_no, len(path), "Error code required after path")) + continue + + code = codes[0] + + if not code: + self.parse_errors.append((line_no, len(path) + 1, "Error code after path cannot be empty")) + continue + + if len(codes) > 1: + self.parse_errors.append((line_no, len(path) + len(code) + 2, "Error code cannot contain spaces")) + continue + + parts = code.split('!') + code = parts[0] + commands = parts[1:] + + parts = code.split(':') + test_name = parts[0] + error_codes = parts[1:] + + test = tests_by_name.get(test_name) + + if not test: + unversioned_name = unversioned_test_names.get(test_name) + + if unversioned_name: + self.parse_errors.append((line_no, len(path) + len(unversioned_name) + 2, "Sanity test '%s' cannot use a Python version like '%s'" % ( + unversioned_name, test_name))) + elif test_name in versioned_test_names: + self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires a Python version like '%s-%s'" % ( + test_name, test_name, args.controller_python.version))) + else: + self.parse_errors.append((line_no, len(path) + 2, "Sanity test '%s' does not exist" % test_name)) + + continue + + if path.endswith(os.path.sep) and not test.include_directories: + self.parse_errors.append((line_no, 1, "Sanity test '%s' does not support directory paths" % test_name)) + continue + + if path not in paths_by_test[test_name] and not test.no_targets: + self.parse_errors.append((line_no, 1, "Sanity test '%s' does not test path '%s'" % (test_name, path))) + continue + + if commands and error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Error code cannot contain both '!' and ':' characters")) + continue + + if commands: + command = commands[0] + + if len(commands) > 1: + self.parse_errors.append((line_no, len(path) + len(test_name) + len(command) + 3, "Error code cannot contain multiple '!' characters")) + continue + + if command == 'skip': + if not test.can_skip: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' cannot be skipped" % test_name)) + continue + + existing_line_no = self.skips.get(test_name, {}).get(path) + + if existing_line_no: + self.parse_errors.append((line_no, 1, "Duplicate '%s' skip for path '%s' first found on line %d" % (test_name, path, existing_line_no))) + continue + + self.skips[test_name][path] = line_no + continue + + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Command '!%s' not recognized" % command)) + continue + + if not test.can_ignore: + self.parse_errors.append((line_no, len(path) + 1, "Sanity test '%s' cannot be ignored" % test_name)) + continue + + if test.error_code: + if not error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 1, "Sanity test '%s' requires an error code" % test_name)) + continue + + error_code = error_codes[0] + + if len(error_codes) > 1: + self.parse_errors.append((line_no, len(path) + len(test_name) + len(error_code) + 3, "Error code cannot contain multiple ':' characters")) + continue + + if error_code in test.optional_error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 3, "Optional error code '%s' cannot be ignored" % ( + error_code))) + continue + else: + if error_codes: + self.parse_errors.append((line_no, len(path) + len(test_name) + 2, "Sanity test '%s' does not support error codes" % test_name)) + continue + + error_code = self.NO_CODE + + existing = self.ignores.get(test_name, {}).get(path, {}).get(error_code) + + if existing: + if test.error_code: + self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for error code '%s' for path '%s' first found on line %d" % ( + test_name, error_code, path, existing))) + else: + self.parse_errors.append((line_no, 1, "Duplicate '%s' ignore for path '%s' first found on line %d" % ( + test_name, path, existing))) + + continue + + self.ignores[test_name][path][error_code] = line_no + + @staticmethod + def load(args: SanityConfig) -> SanityIgnoreParser: + """Return the current SanityIgnore instance, initializing it if needed.""" + try: + return SanityIgnoreParser.instance # type: ignore[attr-defined] + except AttributeError: + pass + + instance = SanityIgnoreParser(args) + + SanityIgnoreParser.instance = instance # type: ignore[attr-defined] + + return instance + + +class SanityIgnoreProcessor: + """Processor for sanity test ignores for a single run of one sanity test.""" + def __init__(self, + args: SanityConfig, + test: SanityTest, + python_version: t.Optional[str], + ) -> None: + name = test.name + code = test.error_code + + if python_version: + full_name = '%s-%s' % (name, python_version) + else: + full_name = name + + self.args = args + self.test = test + self.code = code + self.parser = SanityIgnoreParser.load(args) + self.ignore_entries = self.parser.ignores.get(full_name, {}) + self.skip_entries = self.parser.skips.get(full_name, {}) + self.used_line_numbers: set[int] = set() + + def filter_skipped_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given targets, with any skipped paths filtered out.""" + return sorted(target for target in targets if target.path not in self.skip_entries) + + def process_errors(self, errors: list[SanityMessage], paths: list[str]) -> list[SanityMessage]: + """Return the given errors filtered for ignores and with any settings related errors included.""" + errors = self.filter_messages(errors) + errors.extend(self.get_errors(paths)) + + errors = sorted(set(errors)) + + return errors + + def filter_messages(self, messages: list[SanityMessage]) -> list[SanityMessage]: + """Return a filtered list of the given messages using the entries that have been loaded.""" + filtered = [] + + for message in messages: + if message.code in self.test.optional_error_codes and not self.args.enable_optional_errors: + continue + + path_entry = self.ignore_entries.get(message.path) + + if path_entry: + code = message.code if self.code else SanityIgnoreParser.NO_CODE + line_no = path_entry.get(code) + + if line_no: + self.used_line_numbers.add(line_no) + continue + + filtered.append(message) + + return filtered + + def get_errors(self, paths: list[str]) -> list[SanityMessage]: + """Return error messages related to issues with the file.""" + messages: list[SanityMessage] = [] + + # unused errors + + unused: list[tuple[int, str, str]] = [] + + if self.test.no_targets or self.test.all_targets: + # tests which do not accept a target list, or which use all targets, always return all possible errors, so all ignores can be checked + targets = SanityTargets.get_targets() + test_targets = SanityTargets.filter_and_inject_targets(self.test, targets) + paths = [target.path for target in test_targets] + + for path in paths: + path_entry = self.ignore_entries.get(path) + + if not path_entry: + continue + + unused.extend((line_no, path, code) for code, line_no in path_entry.items() if line_no not in self.used_line_numbers) + + messages.extend(SanityMessage( + code=self.code, + message="Ignoring '%s' on '%s' is unnecessary" % (code, path) if self.code else "Ignoring '%s' is unnecessary" % path, + path=self.parser.relative_path, + line=line, + column=1, + confidence=calculate_best_confidence(((self.parser.path, line), (path, 0)), self.args.metadata) if self.args.metadata.changes else None, + ) for line, path, code in unused) + + return messages + + +class SanitySuccess(TestSuccess): + """Sanity test success.""" + def __init__(self, test: str, python_version: t.Optional[str] = None) -> None: + super().__init__(COMMAND, test, python_version) + + +class SanitySkipped(TestSkipped): + """Sanity test skipped.""" + def __init__(self, test: str, python_version: t.Optional[str] = None) -> None: + super().__init__(COMMAND, test, python_version) + + +class SanityFailure(TestFailure): + """Sanity test failure.""" + def __init__( + self, + test: str, + python_version: t.Optional[str] = None, + messages: t.Optional[c.Sequence[SanityMessage]] = None, + summary: t.Optional[str] = None, + ) -> None: + super().__init__(COMMAND, test, python_version, messages, summary) + + +class SanityMessage(TestMessage): + """Single sanity test message for one file.""" + + +class SanityTargets: + """Sanity test target information.""" + def __init__(self, targets: tuple[TestTarget, ...], include: tuple[TestTarget, ...]) -> None: + self.targets = targets + self.include = include + + @staticmethod + def create(include: list[str], exclude: list[str], require: list[str]) -> SanityTargets: + """Create a SanityTargets instance from the given include, exclude and require lists.""" + _targets = SanityTargets.get_targets() + _include = walk_internal_targets(_targets, include, exclude, require) + return SanityTargets(_targets, _include) + + @staticmethod + def filter_and_inject_targets(test: SanityTest, targets: c.Iterable[TestTarget]) -> list[TestTarget]: + """Filter and inject targets based on test requirements and the given target list.""" + test_targets = list(targets) + + if not test.include_symlinks: + # remove all symlinks unless supported by the test + test_targets = [target for target in test_targets if not target.symlink] + + if not test.include_directories or not test.include_symlinks: + # exclude symlinked directories unless supported by the test + test_targets = [target for target in test_targets if not target.path.endswith(os.path.sep)] + + if test.include_directories: + # include directories containing any of the included files + test_targets += tuple(TestTarget(path, None, None, '') for path in paths_to_dirs([target.path for target in test_targets])) + + if not test.include_symlinks: + # remove all directory symlinks unless supported by the test + test_targets = [target for target in test_targets if not target.symlink] + + return test_targets + + @staticmethod + def get_targets() -> tuple[TestTarget, ...]: + """Return a tuple of sanity test targets. Uses a cached version when available.""" + try: + return SanityTargets.get_targets.targets # type: ignore[attr-defined] + except AttributeError: + targets = tuple(sorted(walk_sanity_targets())) + + SanityTargets.get_targets.targets = targets # type: ignore[attr-defined] + + return targets + + +class SanityTest(metaclass=abc.ABCMeta): + """Sanity test base class.""" + ansible_only = False + + def __init__(self, name: t.Optional[str] = None) -> None: + if not name: + name = self.__class__.__name__ + name = re.sub(r'Test$', '', name) # drop Test suffix + name = re.sub(r'(.)([A-Z][a-z]+)', r'\1-\2', name).lower() # use dashes instead of capitalization + + self.name = name + self.enabled = True + + # Optional error codes represent errors which spontaneously occur without changes to the content under test, such as those based on the current date. + # Because these errors can be unpredictable they behave differently than normal error codes: + # * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors. + # * They cannot be ignored. This is done to maintain the integrity of the ignore system. + self.optional_error_codes: set[str] = set() + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return None + + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return True + + @property + def can_skip(self) -> bool: + """True if the test supports skip entries.""" + return not self.all_targets and not self.no_targets + + @property + def all_targets(self) -> bool: + """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return False + + @property + def include_directories(self) -> bool: + """True if the test targets should include directories.""" + return False + + @property + def include_symlinks(self) -> bool: + """True if the test targets should include symlinks.""" + return False + + @property + def py2_compat(self) -> bool: + """True if the test only applies to code that runs on Python 2.x.""" + return False + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return CONTROLLER_PYTHON_VERSIONS + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: # pylint: disable=unused-argument + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if self.no_targets: + return [] + + raise NotImplementedError('Sanity test "%s" must implement "filter_targets" or set "no_targets" to True.' % self.name) + + def filter_targets_by_version(self, args: SanityConfig, targets: list[TestTarget], python_version: str) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test, taking into account the Python version.""" + del python_version # python_version is not used here, but derived classes may make use of it + + targets = self.filter_targets(targets) + + if self.py2_compat: + # This sanity test is a Python 2.x compatibility test. + content_config = get_content_config(args) + + if content_config.py2_support: + # This collection supports Python 2.x. + # Filter targets to include only those that require support for remote-only Python versions. + targets = self.filter_remote_targets(targets) + else: + # This collection does not support Python 2.x. + # There are no targets to test. + targets = [] + + return targets + + @staticmethod + def filter_remote_targets(targets: list[TestTarget]) -> list[TestTarget]: + """Return a filtered list of the given targets, including only those that require support for remote-only Python versions.""" + targets = [target for target in targets if ( + is_subdir(target.path, data_context().content.module_path) or + is_subdir(target.path, data_context().content.module_utils_path) or + is_subdir(target.path, data_context().content.unit_module_path) or + is_subdir(target.path, data_context().content.unit_module_utils_path) or + # include modules/module_utils within integration test library directories + re.search('^%s/.*/library/' % re.escape(data_context().content.integration_targets_path), target.path) or + # special handling for content in ansible-core + (data_context().content.is_ansible and ( + # utility code that runs in target environments and requires support for remote-only Python versions + is_subdir(target.path, 'test/lib/ansible_test/_util/target/') or + # integration test support modules/module_utils continue to require support for remote-only Python versions + re.search('^test/support/integration/.*/(modules|module_utils)/', target.path) or + # collection loader requires support for remote-only Python versions + re.search('^lib/ansible/utils/collection_loader/', target.path) + )) + )] + + return targets + + +class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which should run on a single python version.""" + @property + def require_libyaml(self) -> bool: + """True if the test requires PyYAML to have libyaml support.""" + return False + + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + +class SanityCodeSmellTest(SanitySingleVersion): + """Sanity test script.""" + def __init__(self, path) -> None: + name = os.path.splitext(os.path.basename(path))[0] + config_path = os.path.splitext(path)[0] + '.json' + + super().__init__(name=name) + + self.path = path + self.config_path = config_path if os.path.exists(config_path) else None + self.config = None + + if self.config_path: + self.config = read_json_file(self.config_path) + + if self.config: + self.enabled = not self.config.get('disabled') + + self.output: t.Optional[str] = self.config.get('output') + self.extensions: list[str] = self.config.get('extensions') + self.prefixes: list[str] = self.config.get('prefixes') + self.files: list[str] = self.config.get('files') + self.text: t.Optional[bool] = self.config.get('text') + self.ignore_self: bool = self.config.get('ignore_self') + self.minimum_python_version: t.Optional[str] = self.config.get('minimum_python_version') + self.maximum_python_version: t.Optional[str] = self.config.get('maximum_python_version') + + self.__all_targets: bool = self.config.get('all_targets') + self.__no_targets: bool = self.config.get('no_targets') + self.__include_directories: bool = self.config.get('include_directories') + self.__include_symlinks: bool = self.config.get('include_symlinks') + self.__py2_compat: bool = self.config.get('py2_compat', False) + else: + self.output = None + self.extensions = [] + self.prefixes = [] + self.files = [] + self.text = None + self.ignore_self = False + self.minimum_python_version = None + self.maximum_python_version = None + + self.__all_targets = False + self.__no_targets = True + self.__include_directories = False + self.__include_symlinks = False + self.__py2_compat = False + + if self.no_targets: + mutually_exclusive = ( + 'extensions', + 'prefixes', + 'files', + 'text', + 'ignore_self', + 'all_targets', + 'include_directories', + 'include_symlinks', + ) + + problems = sorted(name for name in mutually_exclusive if getattr(self, name)) + + if problems: + raise ApplicationError('Sanity test "%s" option "no_targets" is mutually exclusive with options: %s' % (self.name, ', '.join(problems))) + + @property + def all_targets(self) -> bool: + """True if test targets will not be filtered using includes, excludes, requires or changes. Mutually exclusive with no_targets.""" + return self.__all_targets + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return self.__no_targets + + @property + def include_directories(self) -> bool: + """True if the test targets should include directories.""" + return self.__include_directories + + @property + def include_symlinks(self) -> bool: + """True if the test targets should include symlinks.""" + return self.__include_symlinks + + @property + def py2_compat(self) -> bool: + """True if the test only applies to code that runs on Python 2.x.""" + return self.__py2_compat + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + versions = super().supported_python_versions + + if self.minimum_python_version: + versions = tuple(version for version in versions if str_to_version(version) >= str_to_version(self.minimum_python_version)) + + if self.maximum_python_version: + versions = tuple(version for version in versions if str_to_version(version) <= str_to_version(self.maximum_python_version)) + + return versions + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if self.no_targets: + return [] + + if self.text is not None: + if self.text: + targets = [target for target in targets if not is_binary_file(target.path)] + else: + targets = [target for target in targets if is_binary_file(target.path)] + + if self.extensions: + targets = [target for target in targets if os.path.splitext(target.path)[1] in self.extensions + or (is_subdir(target.path, 'bin') and '.py' in self.extensions)] + + if self.prefixes: + targets = [target for target in targets if any(target.path.startswith(pre) for pre in self.prefixes)] + + if self.files: + targets = [target for target in targets if os.path.basename(target.path) in self.files] + + if self.ignore_self and data_context().content.is_ansible: + relative_self_path = os.path.relpath(self.path, data_context().content.root) + targets = [target for target in targets if target.path != relative_self_path] + + return targets + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + cmd = [python.path, self.path] + + env = ansible_environment(args, color=False) + env.update(PYTHONUTF8='1') # force all code-smell sanity tests to run with Python UTF-8 Mode enabled + + pattern = None + data = None + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if self.config: + if self.output == 'path-line-column-message': + pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + elif self.output == 'path-message': + pattern = '^(?P<path>[^:]*): (?P<message>.*)$' + else: + raise ApplicationError('Unsupported output type: %s' % self.output) + + if not self.no_targets: + data = '\n'.join(paths) + + if data: + display.info(data, verbosity=4) + + try: + stdout, stderr = intercept_python(args, python, cmd, data=data, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if args.explain: + return SanitySuccess(self.name) + + if stdout and not stderr: + if pattern: + matches = parse_to_list_of_dict(pattern, stdout) + + messages = [SanityMessage( + message=m['message'], + path=m['path'], + line=int(m.get('line', 0)), + column=int(m.get('column', 0)), + ) for m in matches] + + messages = settings.process_errors(messages, paths) + + if not messages: + return SanitySuccess(self.name) + + return SanityFailure(self.name, messages=messages) + + if stderr or status: + summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + return SanityFailure(self.name, summary=summary) + + messages = settings.process_errors([], paths) + + if messages: + return SanityFailure(self.name, messages=messages) + + return SanitySuccess(self.name) + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + +class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which are idependent of the python version being used.""" + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, None) + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return None + + +class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta): + """Base class for sanity test plugins which should run on multiple python versions.""" + @abc.abstractmethod + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + """Run the sanity test and return the result.""" + + def load_processor(self, args: SanityConfig, python_version: str) -> SanityIgnoreProcessor: + """Load the ignore processor for this sanity test.""" + return SanityIgnoreProcessor(args, self, python_version) + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return False + + @property + def supported_python_versions(self) -> t.Optional[tuple[str, ...]]: + """A tuple of supported Python versions or None if the test does not depend on specific Python versions.""" + return SUPPORTED_PYTHON_VERSIONS + + def filter_targets_by_version(self, args: SanityConfig, targets: list[TestTarget], python_version: str) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test, taking into account the Python version.""" + if not python_version: + raise Exception('python_version is required to filter multi-version tests') + + targets = super().filter_targets_by_version(args, targets, python_version) + + if python_version in REMOTE_ONLY_PYTHON_VERSIONS: + content_config = get_content_config(args) + + if python_version not in content_config.modules.python_versions: + # when a remote-only python version is not supported there are no paths to test + return [] + + # when a remote-only python version is supported, tests must be applied only to targets that support remote-only Python versions + targets = self.filter_remote_targets(targets) + + return targets + + +@cache +def sanity_get_tests() -> tuple[SanityTest, ...]: + """Return a tuple of the available sanity tests.""" + import_plugins('commands/sanity') + sanity_plugins: dict[str, t.Type[SanityTest]] = {} + load_plugins(SanityTest, sanity_plugins) + sanity_plugins.pop('sanity') # SanityCodeSmellTest + sanity_tests = tuple(plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only) + sanity_tests = tuple(sorted(sanity_tests + collect_code_smell_tests(), key=lambda k: k.name)) + return sanity_tests + + +def create_sanity_virtualenv( + args: SanityConfig, + python: PythonConfig, + name: str, + coverage: bool = False, + minimize: bool = False, +) -> t.Optional[VirtualPythonConfig]: + """Return an existing sanity virtual environment matching the requested parameters or create a new one.""" + commands = collect_requirements( # create_sanity_virtualenv() + python=python, + controller=True, + virtualenv=False, + command=None, + ansible=False, + cryptography=False, + coverage=coverage, + minimize=minimize, + sanity=name, + ) + + if commands: + label = f'sanity.{name}' + else: + label = 'sanity' # use a single virtualenv name for tests which have no requirements + + # The path to the virtual environment must be kept short to avoid the 127 character shebang length limit on Linux. + # If the limit is exceeded, generated entry point scripts from pip installed packages will fail with syntax errors. + virtualenv_install = json.dumps([command.serialize() for command in commands], indent=4) + virtualenv_hash = hashlib.sha256(to_bytes(virtualenv_install)).hexdigest()[:8] + virtualenv_cache = os.path.join(os.path.expanduser('~/.ansible/test/venv')) + virtualenv_path = os.path.join(virtualenv_cache, label, f'{python.version}', virtualenv_hash) + virtualenv_marker = os.path.join(virtualenv_path, 'marker.txt') + + meta_install = os.path.join(virtualenv_path, 'meta.install.json') + meta_yaml = os.path.join(virtualenv_path, 'meta.yaml.json') + + virtualenv_python = VirtualPythonConfig( + version=python.version, + path=os.path.join(virtualenv_path, 'bin', 'python'), + ) + + if not os.path.exists(virtualenv_marker): + # a virtualenv without a marker is assumed to have been partially created + remove_tree(virtualenv_path) + + if not create_virtual_environment(args, python, virtualenv_path): + return None + + run_pip(args, virtualenv_python, commands, None) # create_sanity_virtualenv() + + write_text_file(meta_install, virtualenv_install) + + # false positive: pylint: disable=no-member + if any(isinstance(command, PipInstall) and command.has_package('pyyaml') for command in commands): + virtualenv_yaml = yamlcheck(virtualenv_python) + else: + virtualenv_yaml = None + + write_json_file(meta_yaml, virtualenv_yaml) + + created_venvs.append(f'{label}-{python.version}') + + # touch the marker to keep track of when the virtualenv was last used + pathlib.Path(virtualenv_marker).touch() + + return virtualenv_python + + +def check_sanity_virtualenv_yaml(python: VirtualPythonConfig) -> t.Optional[bool]: + """Return True if PyYAML has libyaml support for the given sanity virtual environment, False if it does not and None if it was not found.""" + virtualenv_path = os.path.dirname(os.path.dirname(python.path)) + meta_yaml = os.path.join(virtualenv_path, 'meta.yaml.json') + virtualenv_yaml = read_json_file(meta_yaml) + + return virtualenv_yaml diff --git a/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py b/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py new file mode 100644 index 0000000..6815f88 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/ansible_doc.py @@ -0,0 +1,127 @@ +"""Sanity test for ansible-doc.""" +from __future__ import annotations + +import collections +import os +import re + +from . import ( + DOCUMENTABLE_PLUGINS, + SanitySingleVersion, + SanityFailure, + SanitySuccess, + SanityTargets, + SanityMessage, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...ansible_util import ( + ansible_environment, + intercept_python, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class AnsibleDocTest(SanitySingleVersion): + """Sanity test for ansible-doc.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + plugin_paths = [plugin_path for plugin_type, plugin_path in data_context().content.plugin_paths.items() if plugin_type in DOCUMENTABLE_PLUGINS] + + return [target for target in targets + if os.path.splitext(target.path)[1] == '.py' + and os.path.basename(target.path) != '__init__.py' + and any(is_subdir(target.path, path) for path in plugin_paths) + ] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + doc_targets: dict[str, list[str]] = collections.defaultdict(list) + + remap_types = dict( + modules='module', + ) + + for plugin_type, plugin_path in data_context().content.plugin_paths.items(): + plugin_type = remap_types.get(plugin_type, plugin_type) + + for plugin_file_path in [target.name for target in targets.include if is_subdir(target.path, plugin_path)]: + plugin_parts = os.path.relpath(plugin_file_path, plugin_path).split(os.path.sep) + plugin_name = os.path.splitext(plugin_parts[-1])[0] + + if plugin_name.startswith('_'): + plugin_name = plugin_name[1:] + + plugin_fqcn = data_context().content.prefix + '.'.join(plugin_parts[:-1] + [plugin_name]) + + doc_targets[plugin_type].append(plugin_fqcn) + + env = ansible_environment(args, color=False) + error_messages: list[SanityMessage] = [] + + for doc_type in sorted(doc_targets): + for format_option in [None, '--json']: + cmd = ['ansible-doc', '-t', doc_type] + if format_option is not None: + cmd.append(format_option) + cmd.extend(sorted(doc_targets[doc_type])) + + try: + stdout, stderr = intercept_python(args, python, cmd, env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if status: + summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr) + return SanityFailure(self.name, summary=summary) + + if stdout: + display.info(stdout.strip(), verbosity=3) + + if stderr: + # ignore removed module/plugin warnings + stderr = re.sub(r'\[WARNING]: [^ ]+ [^ ]+ has been removed\n', '', stderr).strip() + + if stderr: + summary = 'Output on stderr from ansible-doc is considered an error.\n\n%s' % SubprocessError(cmd, stderr=stderr) + return SanityFailure(self.name, summary=summary) + + if args.explain: + return SanitySuccess(self.name) + + error_messages = settings.process_errors(error_messages, paths) + + if error_messages: + return SanityFailure(self.name, messages=error_messages) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/compile.py b/test/lib/ansible_test/_internal/commands/sanity/compile.py new file mode 100644 index 0000000..4505338 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/compile.py @@ -0,0 +1,94 @@ +"""Sanity test for proper python syntax.""" +from __future__ import annotations + +import os + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SanitySkipped, + TARGET_SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + parse_to_list_of_dict, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, +) + + +class CompileTest(SanityMultipleVersion): + """Sanity test for proper python syntax.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + cmd = [python.path, os.path.join(TARGET_SANITY_ROOT, 'compile', 'compile.py')] + + data = '\n'.join(paths) + + display.info(data, verbosity=4) + + try: + stdout, stderr = run_command(args, cmd, data=data, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name, python_version=python.version) + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + + results = parse_to_list_of_dict(pattern, stdout) + + results = [SanityMessage( + message=r['message'], + path=r['path'].replace('./', ''), + line=int(r['line']), + column=int(r['column']), + ) for r in results] + + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) diff --git a/test/lib/ansible_test/_internal/commands/sanity/ignores.py b/test/lib/ansible_test/_internal/commands/sanity/ignores.py new file mode 100644 index 0000000..6d9837d --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/ignores.py @@ -0,0 +1,84 @@ +"""Sanity test for the sanity ignore file.""" +from __future__ import annotations + +import os + +from . import ( + SanityFailure, + SanityIgnoreParser, + SanityVersionNeutral, + SanitySuccess, + SanityMessage, + SanityTargets, +) + +from ...test import ( + calculate_confidence, + calculate_best_confidence, + TestResult, +) + +from ...config import ( + SanityConfig, +) + + +class IgnoresTest(SanityVersionNeutral): + """Sanity test for sanity test ignore entries.""" + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + sanity_ignore = SanityIgnoreParser.load(args) + + messages: list[SanityMessage] = [] + + # parse errors + + messages.extend(SanityMessage( + message=message, + path=sanity_ignore.relative_path, + line=line, + column=column, + confidence=calculate_confidence(sanity_ignore.path, line, args.metadata) if args.metadata.changes else None, + ) for line, column, message in sanity_ignore.parse_errors) + + # file not found errors + + messages.extend(SanityMessage( + message="%s '%s' does not exist" % ("Directory" if path.endswith(os.path.sep) else "File", path), + path=sanity_ignore.relative_path, + line=line, + column=1, + confidence=calculate_best_confidence(((sanity_ignore.path, line), (path, 0)), args.metadata) if args.metadata.changes else None, + ) for line, path in sanity_ignore.file_not_found_errors) + + # conflicting ignores and skips + + for test_name, ignores in sanity_ignore.ignores.items(): + for ignore_path, ignore_entry in ignores.items(): + skip_line_no = sanity_ignore.skips.get(test_name, {}).get(ignore_path) + + if not skip_line_no: + continue + + for ignore_line_no in ignore_entry.values(): + messages.append(SanityMessage( + message="Ignoring '%s' is unnecessary due to skip entry on line %d" % (ignore_path, skip_line_no), + path=sanity_ignore.relative_path, + line=ignore_line_no, + column=1, + confidence=calculate_confidence(sanity_ignore.path, ignore_line_no, args.metadata) if args.metadata.changes else None, + )) + + if messages: + return SanityFailure(self.name, messages=messages) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/import.py b/test/lib/ansible_test/_internal/commands/sanity/import.py new file mode 100644 index 0000000..8511d7a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/import.py @@ -0,0 +1,217 @@ +"""Sanity test for proper import exception handling.""" +from __future__ import annotations + +import collections.abc as c +import os + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + TARGET_SANITY_ROOT, + SanityTargets, + create_sanity_virtualenv, + check_sanity_virtualenv_yaml, +) + +from ...constants import ( + CONTROLLER_MIN_PYTHON_VERSION, + REMOTE_ONLY_PYTHON_VERSIONS, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + cache, + SubprocessError, + display, + parse_to_list_of_dict, + is_subdir, + ANSIBLE_TEST_TOOLS_ROOT, +) + +from ...util_common import ( + ResultType, + create_temp_dir, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...python_requirements import ( + PipUnavailableError, + install_requirements, +) + +from ...config import ( + SanityConfig, +) + +from ...coverage_util import ( + cover_python, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + +from ...venv import ( + get_virtualenv_version, +) + + +def _get_module_test(module_restrictions: bool) -> c.Callable[[str], bool]: + """Create a predicate which tests whether a path can be used by modules or not.""" + module_path = data_context().content.module_path + module_utils_path = data_context().content.module_utils_path + if module_restrictions: + return lambda path: is_subdir(path, module_path) or is_subdir(path, module_utils_path) + return lambda path: not (is_subdir(path, module_path) or is_subdir(path, module_utils_path)) + + +class ImportTest(SanityMultipleVersion): + """Sanity test for proper import exception handling.""" + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + if data_context().content.is_ansible: + # all of ansible-core must pass the import test, not just plugins/modules + # modules/module_utils will be tested using the module context + # everything else will be tested using the plugin context + paths = ['lib/ansible'] + else: + # only plugins/modules must pass the import test for collections + paths = list(data_context().content.plugin_paths.values()) + + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' and + any(is_subdir(target.path, path) for path in paths)] + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + if python.version.startswith('2.') and (get_virtualenv_version(args, python.path) or (0,)) < (13,): + # hack to make sure that virtualenv is available under Python 2.x + # on Python 3.x we can use the built-in venv + # version 13+ is required to use the `--no-wheel` option + try: + install_requirements(args, python, virtualenv=True, controller=False) # sanity (import) + except PipUnavailableError as ex: + display.warning(str(ex)) + + temp_root = os.path.join(ResultType.TMP.path, 'sanity', 'import') + + messages = [] + + for import_type, test in ( + ('module', _get_module_test(True)), + ('plugin', _get_module_test(False)), + ): + if import_type == 'plugin' and python.version in REMOTE_ONLY_PYTHON_VERSIONS: + continue + + data = '\n'.join([path for path in paths if test(path)]) + + if not data and not args.prime_venvs: + continue + + virtualenv_python = create_sanity_virtualenv(args, python, f'{self.name}.{import_type}', coverage=args.coverage, minimize=True) + + if not virtualenv_python: + display.warning(f'Skipping sanity test "{self.name}" on Python {python.version} due to missing virtual environment support.') + return SanitySkipped(self.name, python.version) + + virtualenv_yaml = check_sanity_virtualenv_yaml(virtualenv_python) + + if virtualenv_yaml is False: + display.warning(f'Sanity test "{self.name}" ({import_type}) on Python {python.version} may be slow due to missing libyaml support in PyYAML.') + + env = ansible_environment(args, color=False) + + env.update( + SANITY_TEMP_PATH=ResultType.TMP.path, + SANITY_IMPORTER_TYPE=import_type, + ) + + if data_context().content.collection: + external_python = create_sanity_virtualenv(args, args.controller_python, self.name) + + env.update( + SANITY_COLLECTION_FULL_NAME=data_context().content.collection.full_name, + SANITY_EXTERNAL_PYTHON=external_python.path, + SANITY_YAML_TO_JSON=os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'yaml_to_json.py'), + ANSIBLE_CONTROLLER_MIN_PYTHON_VERSION=CONTROLLER_MIN_PYTHON_VERSION, + PYTHONPATH=':'.join((get_ansible_test_python_path(), env["PYTHONPATH"])), + ) + + if args.prime_venvs: + continue + + display.info(import_type + ': ' + data, verbosity=4) + + cmd = ['importer.py'] + + # add the importer to the path so it can be accessed through the coverage injector + env.update( + PATH=os.pathsep.join([os.path.join(TARGET_SANITY_ROOT, 'import'), env['PATH']]), + ) + + try: + stdout, stderr = cover_python(args, virtualenv_python, cmd, self.name, env, capture=True, data=data) + + if stdout or stderr: + raise SubprocessError(cmd, stdout=stdout, stderr=stderr) + except SubprocessError as ex: + if ex.status != 10 or ex.stderr or not ex.stdout: + raise + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<message>.*)$' + + parsed = parse_to_list_of_dict(pattern, ex.stdout) + + relative_temp_root = os.path.relpath(temp_root, data_context().content.root) + os.path.sep + + messages += [SanityMessage( + message=r['message'], + path=os.path.relpath(r['path'], relative_temp_root) if r['path'].startswith(relative_temp_root) else r['path'], + line=int(r['line']), + column=int(r['column']), + ) for r in parsed] + + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + results = settings.process_errors(messages, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) + + +@cache +def get_ansible_test_python_path() -> str: + """ + Return a directory usable for PYTHONPATH, containing only the ansible-test collection loader. + The temporary directory created will be cached for the lifetime of the process and cleaned up at exit. + """ + python_path = create_temp_dir(prefix='ansible-test-') + return python_path diff --git a/test/lib/ansible_test/_internal/commands/sanity/mypy.py b/test/lib/ansible_test/_internal/commands/sanity/mypy.py new file mode 100644 index 0000000..cb8ed12 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/mypy.py @@ -0,0 +1,259 @@ +"""Sanity test which executes mypy.""" +from __future__ import annotations + +import dataclasses +import os +import re +import typing as t + +from . import ( + SanityMultipleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + create_sanity_virtualenv, +) + +from ...constants import ( + CONTROLLER_PYTHON_VERSIONS, + REMOTE_ONLY_PYTHON_VERSIONS, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + parse_to_list_of_dict, + ANSIBLE_TEST_CONTROLLER_ROOT, + ApplicationError, + is_subdir, +) + +from ...util_common import ( + intercept_python, +) + +from ...ansible_util import ( + ansible_environment, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, + VirtualPythonConfig, +) + + +class MypyTest(SanityMultipleVersion): + """Sanity test which executes mypy.""" + ansible_only = True + + vendored_paths = ( + 'lib/ansible/module_utils/six/__init__.py', + 'lib/ansible/module_utils/distro/_distro.py', + 'lib/ansible/module_utils/compat/_selectors2.py', + ) + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' and target.path not in self.vendored_paths and ( + target.path.startswith('lib/ansible/') or target.path.startswith('test/lib/ansible_test/_internal/') + or target.path.startswith('test/lib/ansible_test/_util/target/sanity/import/'))] + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + @property + def needs_pypi(self) -> bool: + """True if the test requires PyPI, otherwise False.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args, python.version) + + paths = [target.path for target in targets.include] + + virtualenv_python = create_sanity_virtualenv(args, args.controller_python, self.name) + + if args.prime_venvs: + return SanitySkipped(self.name, python_version=python.version) + + if not virtualenv_python: + display.warning(f'Skipping sanity test "{self.name}" due to missing virtual environment support on Python {args.controller_python.version}.') + return SanitySkipped(self.name, python.version) + + controller_python_versions = CONTROLLER_PYTHON_VERSIONS + remote_only_python_versions = REMOTE_ONLY_PYTHON_VERSIONS + + contexts = ( + MyPyContext('ansible-test', ['test/lib/ansible_test/_util/target/sanity/import/'], controller_python_versions), + MyPyContext('ansible-test', ['test/lib/ansible_test/_internal/'], controller_python_versions), + MyPyContext('ansible-core', ['lib/ansible/'], controller_python_versions), + MyPyContext('modules', ['lib/ansible/modules/', 'lib/ansible/module_utils/'], remote_only_python_versions), + ) + + unfiltered_messages: list[SanityMessage] = [] + + for context in contexts: + if python.version not in context.python_versions: + continue + + unfiltered_messages.extend(self.test_context(args, virtualenv_python, python, context, paths)) + + notices = [] + messages = [] + + for message in unfiltered_messages: + if message.level != 'error': + notices.append(message) + continue + + match = re.search(r'^(?P<message>.*) {2}\[(?P<code>.*)]$', message.message) + + messages.append(SanityMessage( + message=match.group('message'), + path=message.path, + line=message.line, + column=message.column, + level=message.level, + code=match.group('code'), + )) + + for notice in notices: + display.info(notice.format(), verbosity=3) + + # The following error codes from mypy indicate that results are incomplete. + # That prevents the test from completing successfully, just as if mypy were to traceback or generate unexpected output. + fatal_error_codes = { + 'import', + 'syntax', + } + + fatal_errors = [message for message in messages if message.code in fatal_error_codes] + + if fatal_errors: + error_message = '\n'.join(error.format() for error in fatal_errors) + raise ApplicationError(f'Encountered {len(fatal_errors)} fatal errors reported by mypy:\n{error_message}') + + paths_set = set(paths) + + # Only report messages for paths that were specified as targets. + # Imports in our code are followed by mypy in order to perform its analysis, which is important for accurate results. + # However, it will also report issues on those files, which is not the desired behavior. + messages = [message for message in messages if message.path in paths_set] + + results = settings.process_errors(messages, paths) + + if results: + return SanityFailure(self.name, messages=results, python_version=python.version) + + return SanitySuccess(self.name, python_version=python.version) + + @staticmethod + def test_context( + args: SanityConfig, + virtualenv_python: VirtualPythonConfig, + python: PythonConfig, + context: MyPyContext, + paths: list[str], + ) -> list[SanityMessage]: + """Run mypy tests for the specified context.""" + context_paths = [path for path in paths if any(is_subdir(path, match_path) for match_path in context.paths)] + + if not context_paths: + return [] + + config_path = os.path.join(ANSIBLE_TEST_CONTROLLER_ROOT, 'sanity', 'mypy', f'{context.name}.ini') + + display.info(f'Checking context "{context.name}"', verbosity=1) + + env = ansible_environment(args, color=False) + env['MYPYPATH'] = env['PYTHONPATH'] + + # The --no-site-packages option should not be used, as it will prevent loading of type stubs from the sanity test virtual environment. + + # Enabling the --warn-unused-configs option would help keep the config files clean. + # However, the option can only be used when all files in tested contexts are evaluated. + # Unfortunately sanity tests have no way of making that determination currently. + # The option is also incompatible with incremental mode and caching. + + cmd = [ + # Below are arguments common to all contexts. + # They are kept here to avoid repetition in each config file. + virtualenv_python.path, + '-m', 'mypy', + '--show-column-numbers', + '--show-error-codes', + '--no-error-summary', + # This is a fairly common pattern in our code, so we'll allow it. + '--allow-redefinition', + # Since we specify the path(s) to test, it's important that mypy is configured to use the default behavior of following imports. + '--follow-imports', 'normal', + # Incremental results and caching do not provide significant performance benefits. + # It also prevents the use of the --warn-unused-configs option. + '--no-incremental', + '--cache-dir', '/dev/null', + # The platform is specified here so that results are consistent regardless of what platform the tests are run from. + # In the future, if testing of other platforms is desired, the platform should become part of the test specification, just like the Python version. + '--platform', 'linux', + # Despite what the documentation [1] states, the --python-version option does not cause mypy to search for a corresponding Python executable. + # It will instead use the Python executable that is used to run mypy itself. + # The --python-executable option can be used to specify the Python executable, with the default being the executable used to run mypy. + # As a precaution, that option is used in case the behavior of mypy is updated in the future to match the documentation. + # That should help guarantee that the Python executable providing type hints is the one used to run mypy. + # [1] https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-python-version + '--python-executable', virtualenv_python.path, + '--python-version', python.version, + # Below are context specific arguments. + # They are primarily useful for listing individual 'ignore_missing_imports' entries instead of using a global ignore. + '--config-file', config_path, + ] + + cmd.extend(context_paths) + + try: + stdout, stderr = intercept_python(args, virtualenv_python, cmd, env, capture=True) + + if stdout or stderr: + raise SubprocessError(cmd, stdout=stdout, stderr=stderr) + except SubprocessError as ex: + if ex.status != 1 or ex.stderr or not ex.stdout: + raise + + stdout = ex.stdout + + pattern = r'^(?P<path>[^:]*):(?P<line>[0-9]+):((?P<column>[0-9]+):)? (?P<level>[^:]+): (?P<message>.*)$' + + parsed = parse_to_list_of_dict(pattern, stdout) + + messages = [SanityMessage( + level=r['level'], + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r.get('column') or '0'), + ) for r in parsed] + + return messages + + +@dataclasses.dataclass(frozen=True) +class MyPyContext: + """Context details for a single run of mypy.""" + name: str + paths: list[str] + python_versions: tuple[str, ...] diff --git a/test/lib/ansible_test/_internal/commands/sanity/pep8.py b/test/lib/ansible_test/_internal/commands/sanity/pep8.py new file mode 100644 index 0000000..5df9ace --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pep8.py @@ -0,0 +1,109 @@ +"""Sanity test for PEP 8 style guidelines using pycodestyle.""" +from __future__ import annotations + +import os +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + read_lines_without_comments, + parse_to_list_of_dict, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...host_configs import ( + PythonConfig, +) + + +class Pep8Test(SanitySingleVersion): + """Sanity test for PEP 8 style guidelines using pycodestyle.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'A100' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + current_ignore_file = os.path.join(SANITY_ROOT, 'pep8', 'current-ignore.txt') + current_ignore = sorted(read_lines_without_comments(current_ignore_file, remove_blank_lines=True)) + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + cmd = [ + python.path, + '-m', 'pycodestyle', + '--max-line-length', '160', + '--config', '/dev/null', + '--ignore', ','.join(sorted(current_ignore)), + ] + paths + + if paths: + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + else: + stdout = None + + if args.explain: + return SanitySuccess(self.name) + + if stdout: + pattern = '^(?P<path>[^:]*):(?P<line>[0-9]+):(?P<column>[0-9]+): (?P<code>[WE][0-9]{3}) (?P<message>.*)$' + + results = parse_to_list_of_dict(pattern, stdout) + else: + results = [] + + messages = [SanityMessage( + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r['column']), + level='warning' if r['code'].startswith('W') else 'error', + code=r['code'], + ) for r in results] + + errors = settings.process_errors(messages, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/pslint.py b/test/lib/ansible_test/_internal/commands/sanity/pslint.py new file mode 100644 index 0000000..9136d51 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pslint.py @@ -0,0 +1,119 @@ +"""Sanity test using PSScriptAnalyzer.""" +from __future__ import annotations + +import json +import os +import re +import typing as t + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + find_executable, + ANSIBLE_TEST_DATA_ROOT, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + + +class PslintTest(SanityVersionNeutral): + """Sanity test using PSScriptAnalyzer.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'AnsibleTest' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] in ('.ps1', '.psm1', '.psd1')] + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if not find_executable('pwsh', required='warning'): + return SanitySkipped(self.name) + + cmds = [] + + if args.controller.is_managed or args.requirements: + cmds.append(['pwsh', os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', 'sanity.pslint.ps1')]) + + cmds.append(['pwsh', os.path.join(SANITY_ROOT, 'pslint', 'pslint.ps1')] + paths) + + stdout = '' + + for cmd in cmds: + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name) + + severity = [ + 'Information', + 'Warning', + 'Error', + 'ParseError', + ] + + cwd = data_context().content.root + '/' + + # replace unicode smart quotes and ellipsis with ascii versions + stdout = re.sub('[\u2018\u2019]', "'", stdout) + stdout = re.sub('[\u201c\u201d]', '"', stdout) + stdout = re.sub('[\u2026]', '...', stdout) + + messages = json.loads(stdout) + + errors = [SanityMessage( + code=m['RuleName'], + message=m['Message'], + path=m['ScriptPath'].replace(cwd, ''), + line=m['Line'] or 0, + column=m['Column'] or 0, + level=severity[m['Severity']], + ) for m in messages] + + errors = settings.process_errors(errors, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/pylint.py b/test/lib/ansible_test/_internal/commands/sanity/pylint.py new file mode 100644 index 0000000..86f287a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/pylint.py @@ -0,0 +1,270 @@ +"""Sanity test using pylint.""" +from __future__ import annotations + +import collections.abc as c +import itertools +import json +import os +import datetime +import configparser +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...ansible_util import ( + ansible_environment, + get_collection_detail, + CollectionDetail, + CollectionDetailError, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class PylintTest(SanitySingleVersion): + """Sanity test using pylint.""" + def __init__(self) -> None: + super().__init__() + self.optional_error_codes.update([ + 'ansible-deprecated-date', + 'too-complex', + ]) + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + plugin_dir = os.path.join(SANITY_ROOT, 'pylint', 'plugins') + plugin_names = sorted(p[0] for p in [ + os.path.splitext(p) for p in os.listdir(plugin_dir)] if p[1] == '.py' and p[0] != '__init__') + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + module_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in + paths if is_subdir(p, data_context().content.module_path)] + module_dirs = sorted({p[0] for p in module_paths if len(p) > 1}) + + large_module_group_threshold = 500 + large_module_groups = [key for key, value in + itertools.groupby(module_paths, lambda p: p[0] if len(p) > 1 else '') if len(list(value)) > large_module_group_threshold] + + large_module_group_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in paths + if any(is_subdir(p, os.path.join(data_context().content.module_path, g)) for g in large_module_groups)] + large_module_group_dirs = sorted({os.path.sep.join(p[:2]) for p in large_module_group_paths if len(p) > 2}) + + contexts = [] + remaining_paths = set(paths) + + def add_context(available_paths: set[str], context_name: str, context_filter: c.Callable[[str], bool]) -> None: + """Add the specified context to the context list, consuming available paths that match the given context filter.""" + filtered_paths = set(p for p in available_paths if context_filter(p)) + contexts.append((context_name, sorted(filtered_paths))) + available_paths -= filtered_paths + + def filter_path(path_filter: str = None) -> c.Callable[[str], bool]: + """Return a function that filters out paths which are not a subdirectory of the given path.""" + def context_filter(path_to_filter: str) -> bool: + """Return true if the given path matches, otherwise return False.""" + return is_subdir(path_to_filter, path_filter) + + return context_filter + + for large_module_dir in large_module_group_dirs: + add_context(remaining_paths, 'modules/%s' % large_module_dir, filter_path(os.path.join(data_context().content.module_path, large_module_dir))) + + for module_dir in module_dirs: + add_context(remaining_paths, 'modules/%s' % module_dir, filter_path(os.path.join(data_context().content.module_path, module_dir))) + + add_context(remaining_paths, 'modules', filter_path(data_context().content.module_path)) + add_context(remaining_paths, 'module_utils', filter_path(data_context().content.module_utils_path)) + + add_context(remaining_paths, 'units', filter_path(data_context().content.unit_path)) + + if data_context().content.collection: + add_context(remaining_paths, 'collection', lambda p: True) + else: + add_context(remaining_paths, 'validate-modules', filter_path('test/lib/ansible_test/_util/controller/sanity/validate-modules/')) + add_context(remaining_paths, 'validate-modules-unit', filter_path('test/lib/ansible_test/tests/validate-modules-unit/')) + add_context(remaining_paths, 'code-smell', filter_path('test/lib/ansible_test/_util/controller/sanity/code-smell/')) + add_context(remaining_paths, 'ansible-test-target', filter_path('test/lib/ansible_test/_util/target/')) + add_context(remaining_paths, 'ansible-test', filter_path('test/lib/')) + add_context(remaining_paths, 'test', filter_path('test/')) + add_context(remaining_paths, 'hacking', filter_path('hacking/')) + add_context(remaining_paths, 'ansible', lambda p: True) + + messages = [] + context_times = [] + + collection_detail = None + + if data_context().content.collection: + try: + collection_detail = get_collection_detail(python) + + if not collection_detail.version: + display.warning('Skipping pylint collection version checks since no collection version was found.') + except CollectionDetailError as ex: + display.warning('Skipping pylint collection version checks since collection detail loading failed: %s' % ex.reason) + + test_start = datetime.datetime.utcnow() + + for context, context_paths in sorted(contexts): + if not context_paths: + continue + + context_start = datetime.datetime.utcnow() + messages += self.pylint(args, context, context_paths, plugin_dir, plugin_names, python, collection_detail) + context_end = datetime.datetime.utcnow() + + context_times.append('%s: %d (%s)' % (context, len(context_paths), context_end - context_start)) + + test_end = datetime.datetime.utcnow() + + for context_time in context_times: + display.info(context_time, verbosity=4) + + display.info('total: %d (%s)' % (len(paths), test_end - test_start), verbosity=4) + + errors = [SanityMessage( + message=m['message'].replace('\n', ' '), + path=m['path'], + line=int(m['line']), + column=int(m['column']), + level=m['type'], + code=m['symbol'], + ) for m in messages] + + if args.explain: + return SanitySuccess(self.name) + + errors = settings.process_errors(errors, paths) + + if errors: + return SanityFailure(self.name, messages=errors) + + return SanitySuccess(self.name) + + @staticmethod + def pylint( + args: SanityConfig, + context: str, + paths: list[str], + plugin_dir: str, + plugin_names: list[str], + python: PythonConfig, + collection_detail: CollectionDetail, + ) -> list[dict[str, str]]: + """Run pylint using the config specified by the context on the specified paths.""" + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', context.split('/')[0] + '.cfg') + + if not os.path.exists(rcfile): + if data_context().content.collection: + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', 'collection.cfg') + else: + rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', 'default.cfg') + + parser = configparser.ConfigParser() + parser.read(rcfile) + + if parser.has_section('ansible-test'): + config = dict(parser.items('ansible-test')) + else: + config = {} + + disable_plugins = set(i.strip() for i in config.get('disable-plugins', '').split(',') if i) + load_plugins = set(plugin_names + ['pylint.extensions.mccabe']) - disable_plugins + + cmd = [ + python.path, + '-m', 'pylint', + '--jobs', '0', + '--reports', 'n', + '--max-line-length', '160', + '--max-complexity', '20', + '--rcfile', rcfile, + '--output-format', 'json', + '--load-plugins', ','.join(sorted(load_plugins)), + ] + paths + + if data_context().content.collection: + cmd.extend(['--collection-name', data_context().content.collection.full_name]) + + if collection_detail and collection_detail.version: + cmd.extend(['--collection-version', collection_detail.version]) + + append_python_path = [plugin_dir] + + if data_context().content.collection: + append_python_path.append(data_context().content.collection.root) + + env = ansible_environment(args) + env['PYTHONPATH'] += os.path.pathsep + os.path.pathsep.join(append_python_path) + + # expose plugin paths for use in custom plugins + env.update(dict(('ANSIBLE_TEST_%s_PATH' % k.upper(), os.path.abspath(v) + os.path.sep) for k, v in data_context().content.plugin_paths.items())) + + if paths: + display.info('Checking %d file(s) in context "%s" with config: %s' % (len(paths), context, rcfile), verbosity=1) + + try: + stdout, stderr = run_command(args, cmd, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status >= 32: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + else: + stdout = None + + if not args.explain and stdout: + messages = json.loads(stdout) + else: + messages = [] + + return messages diff --git a/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py b/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py new file mode 100644 index 0000000..4f14a3a --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/sanity_docs.py @@ -0,0 +1,60 @@ +"""Sanity test for documentation of sanity tests.""" +from __future__ import annotations + +import os + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + sanity_get_tests, +) + +from ...test import ( + TestResult, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + + +class SanityDocsTest(SanityVersionNeutral): + """Sanity test for documentation of sanity tests.""" + ansible_only = True + + @property + def can_ignore(self) -> bool: + """True if the test supports ignore entries.""" + return False + + @property + def no_targets(self) -> bool: + """True if the test does not use test targets. Mutually exclusive with all_targets.""" + return True + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + sanity_dir = 'docs/docsite/rst/dev_guide/testing/sanity' + sanity_docs = set(part[0] for part in (os.path.splitext(os.path.basename(path)) for path in data_context().content.get_files(sanity_dir)) + if part[1] == '.rst') + sanity_tests = set(sanity_test.name for sanity_test in sanity_get_tests()) + + missing = sanity_tests - sanity_docs + + results = [] + + results += [SanityMessage( + message='missing docs for ansible-test sanity --test %s' % r, + path=os.path.join(sanity_dir, '%s.rst' % r), + ) for r in sorted(missing)] + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py b/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py new file mode 100644 index 0000000..7de0bda --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/shellcheck.py @@ -0,0 +1,108 @@ +"""Sanity test using shellcheck.""" +from __future__ import annotations + +import os +import typing as t + +from xml.etree.ElementTree import ( + fromstring, + Element, +) + +from . import ( + SanityVersionNeutral, + SanityMessage, + SanityFailure, + SanitySuccess, + SanitySkipped, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + read_lines_without_comments, + find_executable, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + + +class ShellcheckTest(SanityVersionNeutral): + """Sanity test using shellcheck.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'AT1000' + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if os.path.splitext(target.path)[1] == '.sh'] + + def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult: + exclude_file = os.path.join(SANITY_ROOT, 'shellcheck', 'exclude.txt') + exclude = set(read_lines_without_comments(exclude_file, remove_blank_lines=True, optional=True)) + + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + if not find_executable('shellcheck', required='warning'): + return SanitySkipped(self.name) + + cmd = [ + 'shellcheck', + '-e', ','.join(sorted(exclude)), + '--format', 'checkstyle', + ] + paths + + try: + stdout, stderr = run_command(args, cmd, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status > 1: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return SanitySuccess(self.name) + + # json output is missing file paths in older versions of shellcheck, so we'll use xml instead + root: Element = fromstring(stdout) + + results = [] + + for item in root: + for entry in item: + results.append(SanityMessage( + message=entry.attrib['message'], + path=item.attrib['name'], + line=int(entry.attrib['line']), + column=int(entry.attrib['column']), + level=entry.attrib['severity'], + code=entry.attrib['source'].replace('ShellCheck.', ''), + )) + + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py b/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py new file mode 100644 index 0000000..e1dacb7 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/validate_modules.py @@ -0,0 +1,190 @@ +"""Sanity test using validate-modules.""" +from __future__ import annotations + +import collections +import json +import os +import typing as t + +from . import ( + DOCUMENTABLE_PLUGINS, + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, +) + +from ...util_common import ( + run_command, +) + +from ...ansible_util import ( + ansible_environment, + get_collection_detail, + CollectionDetailError, +) + +from ...config import ( + SanityConfig, +) + +from ...ci import ( + get_ci_provider, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class ValidateModulesTest(SanitySingleVersion): + """Sanity test using validate-modules.""" + + def __init__(self) -> None: + super().__init__() + + self.optional_error_codes.update([ + 'deprecated-date', + ]) + + self._prefixes = { + plugin_type: plugin_path + '/' + for plugin_type, plugin_path in data_context().content.plugin_paths.items() + if plugin_type in DOCUMENTABLE_PLUGINS + } + + self._exclusions = set() + + if not data_context().content.collection: + self._exclusions.add('lib/ansible/plugins/cache/base.py') + + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'A100' + + def get_plugin_type(self, target: TestTarget) -> t.Optional[str]: + """Return the plugin type of the given target, or None if it is not a plugin or module.""" + if target.path.endswith('/__init__.py'): + return None + + if target.path in self._exclusions: + return None + + for plugin_type, prefix in self._prefixes.items(): + if target.path.startswith(prefix): + return plugin_type + + return None + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + return [target for target in targets if self.get_plugin_type(target) is not None] + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + env = ansible_environment(args, color=False) + + settings = self.load_processor(args) + + target_per_type = collections.defaultdict(list) + + for target in targets.include: + target_per_type[self.get_plugin_type(target)].append(target) + + cmd = [ + python.path, + os.path.join(SANITY_ROOT, 'validate-modules', 'validate.py'), + '--format', 'json', + '--arg-spec', + ] + + if data_context().content.collection: + cmd.extend(['--collection', data_context().content.collection.directory]) + + try: + collection_detail = get_collection_detail(python) + + if collection_detail.version: + cmd.extend(['--collection-version', collection_detail.version]) + else: + display.warning('Skipping validate-modules collection version checks since no collection version was found.') + except CollectionDetailError as ex: + display.warning('Skipping validate-modules collection version checks since collection detail loading failed: %s' % ex.reason) + else: + base_branch = args.base_branch or get_ci_provider().get_base_branch() + + if base_branch: + cmd.extend([ + '--base-branch', base_branch, + ]) + else: + display.warning('Cannot perform module comparison against the base branch because the base branch was not detected.') + + errors = [] + + for plugin_type, plugin_targets in sorted(target_per_type.items()): + paths = [target.path for target in plugin_targets] + plugin_cmd = list(cmd) + + if plugin_type != 'modules': + plugin_cmd += ['--plugin-type', plugin_type] + + plugin_cmd += paths + + try: + stdout, stderr = run_command(args, plugin_cmd, env=env, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr or status not in (0, 3): + raise SubprocessError(cmd=plugin_cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + continue + + messages = json.loads(stdout) + + for filename in messages: + output = messages[filename] + + for item in output['errors']: + errors.append(SanityMessage( + path=filename, + line=int(item['line']) if 'line' in item else 0, + column=int(item['column']) if 'column' in item else 0, + code='%s' % item['code'], + message=item['msg'], + )) + + all_paths = [target.path for target in targets.include] + all_errors = settings.process_errors(errors, all_paths) + + if args.explain: + return SanitySuccess(self.name) + + if all_errors: + return SanityFailure(self.name, messages=all_errors) + + return SanitySuccess(self.name) diff --git a/test/lib/ansible_test/_internal/commands/sanity/yamllint.py b/test/lib/ansible_test/_internal/commands/sanity/yamllint.py new file mode 100644 index 0000000..a0d859f --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/sanity/yamllint.py @@ -0,0 +1,125 @@ +"""Sanity test using yamllint.""" +from __future__ import annotations + +import json +import os +import typing as t + +from . import ( + SanitySingleVersion, + SanityMessage, + SanityFailure, + SanitySuccess, + SanityTargets, + SANITY_ROOT, +) + +from ...test import ( + TestResult, +) + +from ...target import ( + TestTarget, +) + +from ...util import ( + SubprocessError, + display, + is_subdir, +) + +from ...util_common import ( + run_command, +) + +from ...config import ( + SanityConfig, +) + +from ...data import ( + data_context, +) + +from ...host_configs import ( + PythonConfig, +) + + +class YamllintTest(SanitySingleVersion): + """Sanity test using yamllint.""" + @property + def error_code(self) -> t.Optional[str]: + """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes.""" + return 'ansible-test' + + @property + def require_libyaml(self) -> bool: + """True if the test requires PyYAML to have libyaml support.""" + return True + + def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]: + """Return the given list of test targets, filtered to include only those relevant for the test.""" + yaml_targets = [target for target in targets if os.path.splitext(target.path)[1] in ('.yml', '.yaml')] + + for plugin_type, plugin_path in sorted(data_context().content.plugin_paths.items()): + if plugin_type == 'module_utils': + continue + + yaml_targets.extend([target for target in targets if + os.path.splitext(target.path)[1] == '.py' and + os.path.basename(target.path) != '__init__.py' and + is_subdir(target.path, plugin_path)]) + + return yaml_targets + + def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult: + settings = self.load_processor(args) + + paths = [target.path for target in targets.include] + + results = self.test_paths(args, paths, python) + results = settings.process_errors(results, paths) + + if results: + return SanityFailure(self.name, messages=results) + + return SanitySuccess(self.name) + + @staticmethod + def test_paths(args: SanityConfig, paths: list[str], python: PythonConfig) -> list[SanityMessage]: + """Test the specified paths using the given Python and return the results.""" + cmd = [ + python.path, + os.path.join(SANITY_ROOT, 'yamllint', 'yamllinter.py'), + ] + + data = '\n'.join(paths) + + display.info(data, verbosity=4) + + try: + stdout, stderr = run_command(args, cmd, data=data, capture=True) + status = 0 + except SubprocessError as ex: + stdout = ex.stdout + stderr = ex.stderr + status = ex.status + + if stderr: + raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout) + + if args.explain: + return [] + + results = json.loads(stdout)['messages'] + + results = [SanityMessage( + code=r['code'], + message=r['message'], + path=r['path'], + line=int(r['line']), + column=int(r['column']), + level=r['level'], + ) for r in results] + + return results diff --git a/test/lib/ansible_test/_internal/commands/shell/__init__.py b/test/lib/ansible_test/_internal/commands/shell/__init__.py new file mode 100644 index 0000000..5e8c101 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/shell/__init__.py @@ -0,0 +1,135 @@ +"""Open a shell prompt inside an ansible-test environment.""" +from __future__ import annotations + +import os +import sys +import typing as t + +from ...util import ( + ApplicationError, + OutputStream, + display, + SubprocessError, + HostConnectionError, +) + +from ...config import ( + ShellConfig, +) + +from ...executor import ( + Delegate, +) + +from ...connections import ( + Connection, + LocalConnection, + SshConnection, +) + +from ...host_profiles import ( + ControllerProfile, + PosixProfile, + SshTargetHostProfile, +) + +from ...provisioning import ( + prepare_profiles, +) + +from ...host_configs import ( + ControllerConfig, + OriginConfig, +) + +from ...inventory import ( + create_controller_inventory, + create_posix_inventory, +) + + +def command_shell(args: ShellConfig) -> None: + """Entry point for the `shell` command.""" + if args.raw and isinstance(args.targets[0], ControllerConfig): + raise ApplicationError('The --raw option has no effect on the controller.') + + if not args.export and not args.cmd and not sys.stdin.isatty(): + raise ApplicationError('Standard input must be a TTY to launch a shell.') + + host_state = prepare_profiles(args, skip_setup=args.raw) # shell + + if args.delegate: + raise Delegate(host_state=host_state) + + if args.raw and not isinstance(args.controller, OriginConfig): + display.warning('The --raw option will only be applied to the target.') + + target_profile = t.cast(SshTargetHostProfile, host_state.target_profiles[0]) + + if isinstance(target_profile, ControllerProfile): + # run the shell locally unless a target was requested + con: Connection = LocalConnection(args) + + if args.export: + display.info('Configuring controller inventory.', verbosity=1) + create_controller_inventory(args, args.export, host_state.controller_profile) + else: + # a target was requested, connect to it over SSH + con = target_profile.get_controller_target_connections()[0] + + if args.export: + display.info('Configuring target inventory.', verbosity=1) + create_posix_inventory(args, args.export, host_state.target_profiles, True) + + if args.export: + return + + if args.cmd: + # Running a command is assumed to be non-interactive. Only a shell (no command) is interactive. + # If we want to support interactive commands in the future, we'll need an `--interactive` command line option. + # Command stderr output is allowed to mix with our own output, which is all sent to stderr. + con.run(args.cmd, capture=False, interactive=False, output_stream=OutputStream.ORIGINAL) + return + + if isinstance(con, SshConnection) and args.raw: + cmd: list[str] = [] + elif isinstance(target_profile, PosixProfile): + cmd = [] + + if args.raw: + shell = 'sh' # shell required for non-ssh connection + else: + shell = 'bash' + + python = target_profile.python # make sure the python interpreter has been initialized before opening a shell + display.info(f'Target Python {python.version} is at: {python.path}') + + optional_vars = ( + 'TERM', # keep backspace working + ) + + env = {name: os.environ[name] for name in optional_vars if name in os.environ} + + if env: + cmd = ['/usr/bin/env'] + [f'{name}={value}' for name, value in env.items()] + + cmd += [shell, '-i'] + else: + cmd = [] + + try: + con.run(cmd, capture=False, interactive=True) + except SubprocessError as ex: + if isinstance(con, SshConnection) and ex.status == 255: + # 255 indicates SSH itself failed, rather than a command run on the remote host. + # In this case, report a host connection error so additional troubleshooting output is provided. + if not args.delegate and not args.host_path: + def callback() -> None: + """Callback to run during error display.""" + target_profile.on_target_failure() # when the controller is not delegated, report failures immediately + else: + callback = None + + raise HostConnectionError(f'SSH shell connection failed for host {target_profile.config}: {ex}', callback) from ex + + raise diff --git a/test/lib/ansible_test/_internal/commands/units/__init__.py b/test/lib/ansible_test/_internal/commands/units/__init__.py new file mode 100644 index 0000000..f666d41 --- /dev/null +++ b/test/lib/ansible_test/_internal/commands/units/__init__.py @@ -0,0 +1,343 @@ +"""Execute unit tests using pytest.""" +from __future__ import annotations + +import os +import sys +import typing as t + +from ...constants import ( + CONTROLLER_MIN_PYTHON_VERSION, + CONTROLLER_PYTHON_VERSIONS, + REMOTE_ONLY_PYTHON_VERSIONS, + SUPPORTED_PYTHON_VERSIONS, +) + +from ...io import ( + write_text_file, + make_dirs, +) + +from ...util import ( + ANSIBLE_TEST_DATA_ROOT, + display, + is_subdir, + str_to_version, + SubprocessError, + ANSIBLE_LIB_ROOT, + ANSIBLE_TEST_TARGET_ROOT, +) + +from ...util_common import ( + ResultType, + handle_layout_messages, + create_temp_dir, +) + +from ...ansible_util import ( + ansible_environment, + get_ansible_python_path, +) + +from ...target import ( + walk_internal_targets, + walk_units_targets, +) + +from ...config import ( + UnitsConfig, +) + +from ...coverage_util import ( + cover_python, +) + +from ...data import ( + data_context, +) + +from ...executor import ( + AllTargetsSkipped, + Delegate, + get_changes_filter, +) + +from ...python_requirements import ( + install_requirements, +) + +from ...content_config import ( + get_content_config, +) + +from ...host_configs import ( + PosixConfig, +) + +from ...provisioning import ( + prepare_profiles, +) + +from ...pypi_proxy import ( + configure_pypi_proxy, +) + +from ...host_profiles import ( + PosixProfile, +) + + +class TestContext: + """Contexts that unit tests run in based on the type of content.""" + controller = 'controller' + modules = 'modules' + module_utils = 'module_utils' + + +def command_units(args: UnitsConfig) -> None: + """Run unit tests.""" + handle_layout_messages(data_context().content.unit_messages) + + changes = get_changes_filter(args) + require = args.require + changes + include = walk_internal_targets(walk_units_targets(), args.include, args.exclude, require) + + paths = [target.path for target in include] + + content_config = get_content_config(args) + supported_remote_python_versions = content_config.modules.python_versions + + if content_config.modules.controller_only: + # controller-only collections run modules/module_utils unit tests as controller-only tests + module_paths = [] + module_utils_paths = [] + else: + # normal collections run modules/module_utils unit tests isolated from controller code due to differences in python version requirements + module_paths = [path for path in paths if is_subdir(path, data_context().content.unit_module_path)] + module_utils_paths = [path for path in paths if is_subdir(path, data_context().content.unit_module_utils_path)] + + controller_paths = sorted(path for path in set(paths) - set(module_paths) - set(module_utils_paths)) + + remote_paths = module_paths or module_utils_paths + + test_context_paths = { + TestContext.modules: module_paths, + TestContext.module_utils: module_utils_paths, + TestContext.controller: controller_paths, + } + + if not paths: + raise AllTargetsSkipped() + + targets = t.cast(list[PosixConfig], args.targets) + target_versions: dict[str, PosixConfig] = {target.python.version: target for target in targets} + skipped_versions = args.host_settings.skipped_python_versions + warn_versions = [] + + # requested python versions that are remote-only and not supported by this collection + test_versions = [version for version in target_versions if version in REMOTE_ONLY_PYTHON_VERSIONS and version not in supported_remote_python_versions] + + if test_versions: + for version in test_versions: + display.warning(f'Skipping unit tests on Python {version} because it is not supported by this collection.' + f' Supported Python versions are: {", ".join(content_config.python_versions)}') + + warn_versions.extend(test_versions) + + if warn_versions == list(target_versions): + raise AllTargetsSkipped() + + if not remote_paths: + # all selected unit tests are controller tests + + # requested python versions that are remote-only + test_versions = [version for version in target_versions if version in REMOTE_ONLY_PYTHON_VERSIONS and version not in warn_versions] + + if test_versions: + for version in test_versions: + display.warning(f'Skipping unit tests on Python {version} because it is only supported by module/module_utils unit tests.' + ' No module/module_utils unit tests were selected.') + + warn_versions.extend(test_versions) + + if warn_versions == list(target_versions): + raise AllTargetsSkipped() + + if not controller_paths: + # all selected unit tests are remote tests + + # requested python versions that are not supported by remote tests for this collection + test_versions = [version for version in target_versions if version not in supported_remote_python_versions and version not in warn_versions] + + if test_versions: + for version in test_versions: + display.warning(f'Skipping unit tests on Python {version} because it is not supported by module/module_utils unit tests of this collection.' + f' Supported Python versions are: {", ".join(supported_remote_python_versions)}') + + warn_versions.extend(test_versions) + + if warn_versions == list(target_versions): + raise AllTargetsSkipped() + + host_state = prepare_profiles(args, targets_use_pypi=True) # units + + if args.delegate: + raise Delegate(host_state=host_state, require=changes, exclude=args.exclude) + + test_sets = [] + + if args.requirements_mode != 'skip': + configure_pypi_proxy(args, host_state.controller_profile) # units + + for version in SUPPORTED_PYTHON_VERSIONS: + if version not in target_versions and version not in skipped_versions: + continue + + test_candidates = [] + + for test_context, paths in test_context_paths.items(): + if test_context == TestContext.controller: + if version not in CONTROLLER_PYTHON_VERSIONS: + continue + else: + if version not in supported_remote_python_versions: + continue + + if not paths: + continue + + env = ansible_environment(args) + + env.update( + PYTHONPATH=get_units_ansible_python_path(args, test_context), + ANSIBLE_CONTROLLER_MIN_PYTHON_VERSION=CONTROLLER_MIN_PYTHON_VERSION, + ) + + test_candidates.append((test_context, paths, env)) + + if not test_candidates: + continue + + if version in skipped_versions: + display.warning("Skipping unit tests on Python %s because it could not be found." % version) + continue + + target_profiles: dict[str, PosixProfile] = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)} + target_profile = target_profiles[version] + + final_candidates = [(test_context, target_profile.python, paths, env) for test_context, paths, env in test_candidates] + controller = any(test_context == TestContext.controller for test_context, python, paths, env in final_candidates) + + if args.requirements_mode != 'skip': + install_requirements(args, target_profile.python, ansible=controller, command=True, controller=False) # units + + test_sets.extend(final_candidates) + + if args.requirements_mode == 'only': + sys.exit() + + for test_context, python, paths, env in test_sets: + # When using pytest-mock, make sure that features introduced in Python 3.8 are available to older Python versions. + # This is done by enabling the mock_use_standalone_module feature, which forces use of mock even when unittest.mock is available. + # Later Python versions have not introduced additional unittest.mock features, so use of mock is not needed as of Python 3.8. + # If future Python versions introduce new unittest.mock features, they will not be available to older Python versions. + # Having the cutoff at Python 3.8 also eases packaging of ansible-core since no supported controller version requires the use of mock. + # + # NOTE: This only affects use of pytest-mock. + # Collection unit tests may directly import mock, which will be provided by ansible-test when it installs requirements using pip. + # Although mock is available for ansible-core unit tests, they should import units.compat.mock instead. + if str_to_version(python.version) < (3, 8): + config_name = 'legacy.ini' + else: + config_name = 'default.ini' + + cmd = [ + 'pytest', + '--forked', + '-r', 'a', + '-n', str(args.num_workers) if args.num_workers else 'auto', + '--color', + 'yes' if args.color else 'no', + '-p', 'no:cacheprovider', + '-c', os.path.join(ANSIBLE_TEST_DATA_ROOT, 'pytest', 'config', config_name), + '--junit-xml', os.path.join(ResultType.JUNIT.path, 'python%s-%s-units.xml' % (python.version, test_context)), + '--strict-markers', # added in pytest 4.5.0 + '--rootdir', data_context().content.root, + ] + + if not data_context().content.collection: + cmd.append('--durations=25') + + plugins = [] + + if args.coverage: + plugins.append('ansible_pytest_coverage') + + if data_context().content.collection: + plugins.append('ansible_pytest_collections') + + if plugins: + env['PYTHONPATH'] += ':%s' % os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'pytest/plugins') + env['PYTEST_PLUGINS'] = ','.join(plugins) + + if args.collect_only: + cmd.append('--collect-only') + + if args.verbosity: + cmd.append('-' + ('v' * args.verbosity)) + + cmd.extend(paths) + + display.info('Unit test %s with Python %s' % (test_context, python.version)) + + try: + cover_python(args, python, cmd, test_context, env, capture=False) + except SubprocessError as ex: + # pytest exits with status code 5 when all tests are skipped, which isn't an error for our use case + if ex.status != 5: + raise + + +def get_units_ansible_python_path(args: UnitsConfig, test_context: str) -> str: + """ + Return a directory usable for PYTHONPATH, containing only the modules and module_utils portion of the ansible package. + The temporary directory created will be cached for the lifetime of the process and cleaned up at exit. + """ + if test_context == TestContext.controller: + return get_ansible_python_path(args) + + try: + cache = get_units_ansible_python_path.cache # type: ignore[attr-defined] + except AttributeError: + cache = get_units_ansible_python_path.cache = {} # type: ignore[attr-defined] + + python_path = cache.get(test_context) + + if python_path: + return python_path + + python_path = create_temp_dir(prefix='ansible-test-') + ansible_path = os.path.join(python_path, 'ansible') + ansible_test_path = os.path.join(python_path, 'ansible_test') + + write_text_file(os.path.join(ansible_path, '__init__.py'), '', True) + os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'module_utils'), os.path.join(ansible_path, 'module_utils')) + + if data_context().content.collection: + # built-in runtime configuration for the collection loader + make_dirs(os.path.join(ansible_path, 'config')) + os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'config', 'ansible_builtin_runtime.yml'), os.path.join(ansible_path, 'config', 'ansible_builtin_runtime.yml')) + + # current collection loader required by all python versions supported by the controller + write_text_file(os.path.join(ansible_path, 'utils', '__init__.py'), '', True) + os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'utils', 'collection_loader'), os.path.join(ansible_path, 'utils', 'collection_loader')) + + # legacy collection loader required by all python versions not supported by the controller + write_text_file(os.path.join(ansible_test_path, '__init__.py'), '', True) + write_text_file(os.path.join(ansible_test_path, '_internal', '__init__.py'), '', True) + elif test_context == TestContext.modules: + # only non-collection ansible module tests should have access to ansible built-in modules + os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'modules'), os.path.join(ansible_path, 'modules')) + + cache[test_context] = python_path + + return python_path |