diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/config.py')
-rw-r--r-- | test/lib/ansible_test/_internal/config.py | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/config.py b/test/lib/ansible_test/_internal/config.py new file mode 100644 index 00000000..a3c31959 --- /dev/null +++ b/test/lib/ansible_test/_internal/config.py @@ -0,0 +1,356 @@ +"""Configuration classes.""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import sys + +from . import types as t + +from .util import ( + find_python, + generate_pip_command, + ApplicationError, +) + +from .util_common import ( + docker_qualify_image, + get_docker_completion, + get_remote_completion, + CommonConfig, +) + +from .metadata import ( + Metadata, +) + +from .data import ( + data_context, +) + +try: + TIntegrationConfig = t.TypeVar('TIntegrationConfig', bound='IntegrationConfig') +except AttributeError: + TIntegrationConfig = None # pylint: disable=invalid-name + + +class ParsedRemote: + """A parsed version of a "remote" string.""" + def __init__(self, arch, platform, version): # type: (t.Optional[str], str, str) -> None + self.arch = arch + self.platform = platform + self.version = version + + @staticmethod + def parse(value): # type: (str) -> t.Optional['ParsedRemote'] + """Return a ParsedRemote from the given value or None if the syntax is invalid.""" + parts = value.split('/') + + if len(parts) == 2: + arch = None + platform, version = parts + elif len(parts) == 3: + arch, platform, version = parts + else: + return None + + return ParsedRemote(arch, platform, version) + + +class EnvironmentConfig(CommonConfig): + """Configuration common to all commands which execute in an environment.""" + def __init__(self, args, command): + """ + :type args: any + :type command: str + """ + super(EnvironmentConfig, self).__init__(args, command) + + self.local = args.local is True + self.venv = args.venv + self.venv_system_site_packages = args.venv_system_site_packages + + self.python = args.python if 'python' in args else None # type: str + + self.docker = docker_qualify_image(args.docker) # type: str + self.docker_raw = args.docker # type: str + self.remote = args.remote # type: str + + if self.remote: + self.parsed_remote = ParsedRemote.parse(self.remote) + + if not self.parsed_remote or not self.parsed_remote.platform or not self.parsed_remote.version: + raise ApplicationError('Unrecognized remote "%s" syntax. Use "platform/version" or "arch/platform/version".' % self.remote) + else: + self.parsed_remote = None + + self.docker_privileged = args.docker_privileged if 'docker_privileged' in args else False # type: bool + self.docker_pull = args.docker_pull if 'docker_pull' in args else False # type: bool + self.docker_keep_git = args.docker_keep_git if 'docker_keep_git' in args else False # type: bool + self.docker_seccomp = args.docker_seccomp if 'docker_seccomp' in args else None # type: str + self.docker_memory = args.docker_memory if 'docker_memory' in args else None + self.docker_terminate = args.docker_terminate if 'docker_terminate' in args else None # type: str + self.docker_network = args.docker_network if 'docker_network' in args else None # type: str + + if self.docker_seccomp is None: + self.docker_seccomp = get_docker_completion().get(self.docker_raw, {}).get('seccomp', 'default') + + self.remote_stage = args.remote_stage # type: str + self.remote_provider = args.remote_provider # type: str + self.remote_endpoint = args.remote_endpoint # type: t.Optional[str] + self.remote_aws_region = args.remote_aws_region # type: str + self.remote_terminate = args.remote_terminate # type: str + + if self.remote_provider == 'default': + self.remote_provider = None + + self.requirements = args.requirements # type: bool + + if self.python == 'default': + self.python = None + + actual_major_minor = '.'.join(str(i) for i in sys.version_info[:2]) + + self.python_version = self.python or actual_major_minor + self.python_interpreter = args.python_interpreter + + self.pip_check = args.pip_check + + self.delegate = self.docker or self.remote or self.venv + self.delegate_args = [] # type: t.List[str] + + if self.delegate: + self.requirements = True + + self.inject_httptester = args.inject_httptester if 'inject_httptester' in args else False # type: bool + self.httptester = docker_qualify_image(args.httptester if 'httptester' in args else '') # type: str + + if self.get_delegated_completion().get('httptester', 'enabled') == 'disabled': + self.httptester = False + + if self.get_delegated_completion().get('pip-check', 'enabled') == 'disabled': + self.pip_check = False + + if args.check_python and args.check_python != actual_major_minor: + raise ApplicationError('Running under Python %s instead of Python %s as expected.' % (actual_major_minor, args.check_python)) + + if self.docker_keep_git: + def git_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None + """Add files from the content root .git directory to the payload file list.""" + for dirpath, _dirnames, filenames in os.walk(os.path.join(data_context().content.root, '.git')): + paths = [os.path.join(dirpath, filename) for filename in filenames] + files.extend((path, os.path.relpath(path, data_context().content.root)) for path in paths) + + data_context().register_payload_callback(git_callback) + + @property + def python_executable(self): + """ + :rtype: str + """ + return find_python(self.python_version) + + @property + def pip_command(self): + """ + :rtype: list[str] + """ + return generate_pip_command(self.python_executable) + + def get_delegated_completion(self): + """Returns a dictionary of settings specific to the selected delegation system, if any. Otherwise returns an empty dictionary. + :rtype: dict[str, str] + """ + if self.docker: + return get_docker_completion().get(self.docker_raw, {}) + + if self.remote: + return get_remote_completion().get(self.remote, {}) + + return {} + + +class TestConfig(EnvironmentConfig): + """Configuration common to all test commands.""" + def __init__(self, args, command): + """ + :type args: any + :type command: str + """ + super(TestConfig, self).__init__(args, command) + + self.coverage = args.coverage # type: bool + self.coverage_label = args.coverage_label # type: str + self.coverage_check = args.coverage_check # type: bool + self.coverage_config_base_path = None # type: t.Optional[str] + self.include = args.include or [] # type: t.List[str] + self.exclude = args.exclude or [] # type: t.List[str] + self.require = args.require or [] # type: t.List[str] + + self.changed = args.changed # type: bool + self.tracked = args.tracked # type: bool + self.untracked = args.untracked # type: bool + self.committed = args.committed # type: bool + self.staged = args.staged # type: bool + self.unstaged = args.unstaged # type: bool + self.changed_from = args.changed_from # type: str + self.changed_path = args.changed_path # type: t.List[str] + self.base_branch = args.base_branch # type: str + + self.lint = args.lint if 'lint' in args else False # type: bool + self.junit = args.junit if 'junit' in args else False # type: bool + self.failure_ok = args.failure_ok if 'failure_ok' in args else False # type: bool + + self.metadata = Metadata.from_file(args.metadata) if args.metadata else Metadata() + self.metadata_path = None + + if self.coverage_check: + self.coverage = True + + def metadata_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None + """Add the metadata file to the payload file list.""" + config = self + + if self.metadata_path: + files.append((os.path.abspath(config.metadata_path), config.metadata_path)) + + data_context().register_payload_callback(metadata_callback) + + +class ShellConfig(EnvironmentConfig): + """Configuration for the shell command.""" + def __init__(self, args): + """ + :type args: any + """ + super(ShellConfig, self).__init__(args, 'shell') + + self.raw = args.raw # type: bool + + if self.raw: + self.httptester = False + + +class SanityConfig(TestConfig): + """Configuration for the sanity command.""" + def __init__(self, args): + """ + :type args: any + """ + super(SanityConfig, self).__init__(args, 'sanity') + + self.test = args.test # type: t.List[str] + self.skip_test = args.skip_test # type: t.List[str] + self.list_tests = args.list_tests # type: bool + self.allow_disabled = args.allow_disabled # type: bool + self.enable_optional_errors = args.enable_optional_errors # type: bool + self.info_stderr = self.lint + + +class IntegrationConfig(TestConfig): + """Configuration for the integration command.""" + def __init__(self, args, command): + """ + :type args: any + :type command: str + """ + super(IntegrationConfig, self).__init__(args, command) + + self.start_at = args.start_at # type: str + self.start_at_task = args.start_at_task # type: str + self.allow_destructive = args.allow_destructive # type: bool + self.allow_root = args.allow_root # type: bool + self.allow_disabled = args.allow_disabled # type: bool + self.allow_unstable = args.allow_unstable # type: bool + self.allow_unstable_changed = args.allow_unstable_changed # type: bool + self.allow_unsupported = args.allow_unsupported # type: bool + self.retry_on_error = args.retry_on_error # type: bool + self.continue_on_error = args.continue_on_error # type: bool + self.debug_strategy = args.debug_strategy # type: bool + self.changed_all_target = args.changed_all_target # type: str + self.changed_all_mode = args.changed_all_mode # type: str + self.list_targets = args.list_targets # type: bool + self.tags = args.tags + self.skip_tags = args.skip_tags + self.diff = args.diff + self.no_temp_workdir = args.no_temp_workdir + self.no_temp_unicode = args.no_temp_unicode + + if self.get_delegated_completion().get('temp-unicode', 'enabled') == 'disabled': + self.no_temp_unicode = True + + if self.list_targets: + self.explain = True + self.info_stderr = True + + def get_ansible_config(self): # type: () -> str + """Return the path to the Ansible config for the given config.""" + ansible_config_relative_path = os.path.join(data_context().content.integration_path, '%s.cfg' % self.command) + ansible_config_path = os.path.join(data_context().content.root, ansible_config_relative_path) + + if not os.path.exists(ansible_config_path): + # use the default empty configuration unless one has been provided + ansible_config_path = super(IntegrationConfig, self).get_ansible_config() + + return ansible_config_path + + +class PosixIntegrationConfig(IntegrationConfig): + """Configuration for the posix integration command.""" + + def __init__(self, args): + """ + :type args: any + """ + super(PosixIntegrationConfig, self).__init__(args, 'integration') + + +class WindowsIntegrationConfig(IntegrationConfig): + """Configuration for the windows integration command.""" + + def __init__(self, args): + """ + :type args: any + """ + super(WindowsIntegrationConfig, self).__init__(args, 'windows-integration') + + self.windows = args.windows # type: t.List[str] + self.inventory = args.inventory # type: str + + if self.windows: + self.allow_destructive = True + + +class NetworkIntegrationConfig(IntegrationConfig): + """Configuration for the network integration command.""" + + def __init__(self, args): + """ + :type args: any + """ + super(NetworkIntegrationConfig, self).__init__(args, 'network-integration') + + self.platform = args.platform # type: t.List[str] + self.platform_collection = dict(args.platform_collection or []) # type: t.Dict[str, str] + self.platform_connection = dict(args.platform_connection or []) # type: t.Dict[str, str] + self.inventory = args.inventory # type: str + self.testcase = args.testcase # type: str + + +class UnitsConfig(TestConfig): + """Configuration for the units command.""" + def __init__(self, args): + """ + :type args: any + """ + super(UnitsConfig, self).__init__(args, 'units') + + self.collect_only = args.collect_only # type: bool + self.num_workers = args.num_workers # type: int + + self.requirements_mode = args.requirements_mode if 'requirements_mode' in args else '' + + if self.requirements_mode == 'only': + self.requirements = True + elif self.requirements_mode == 'skip': + self.requirements = False |