diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/data.py')
-rw-r--r-- | test/lib/ansible_test/_internal/data.py | 286 |
1 files changed, 286 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/data.py b/test/lib/ansible_test/_internal/data.py new file mode 100644 index 0000000..635b0c3 --- /dev/null +++ b/test/lib/ansible_test/_internal/data.py @@ -0,0 +1,286 @@ +"""Context information for the current invocation of ansible-test.""" +from __future__ import annotations + +import collections.abc as c +import dataclasses +import os +import typing as t + +from .util import ( + ApplicationError, + import_plugins, + is_subdir, + is_valid_identifier, + ANSIBLE_LIB_ROOT, + ANSIBLE_TEST_ROOT, + ANSIBLE_SOURCE_ROOT, + display, + cache, +) + +from .provider import ( + find_path_provider, + get_path_provider_classes, + ProviderNotFoundForPath, +) + +from .provider.source import ( + SourceProvider, +) + +from .provider.source.unversioned import ( + UnversionedSource, +) + +from .provider.source.installed import ( + InstalledSource, +) + +from .provider.source.unsupported import ( + UnsupportedSource, +) + +from .provider.layout import ( + ContentLayout, + LayoutProvider, +) + +from .provider.layout.unsupported import ( + UnsupportedLayout, +) + + +class DataContext: + """Data context providing details about the current execution environment for ansible-test.""" + def __init__(self) -> None: + content_path = os.environ.get('ANSIBLE_TEST_CONTENT_ROOT') + current_path = os.getcwd() + + layout_providers = get_path_provider_classes(LayoutProvider) + source_providers = get_path_provider_classes(SourceProvider) + + self.__layout_providers = layout_providers + self.__source_providers = source_providers + self.__ansible_source: t.Optional[tuple[tuple[str, str], ...]] = None + + self.payload_callbacks: list[c.Callable[[list[tuple[str, str]]], None]] = [] + + if content_path: + content = self.__create_content_layout(layout_providers, source_providers, content_path, False) + elif ANSIBLE_SOURCE_ROOT and is_subdir(current_path, ANSIBLE_SOURCE_ROOT): + content = self.__create_content_layout(layout_providers, source_providers, ANSIBLE_SOURCE_ROOT, False) + else: + content = self.__create_content_layout(layout_providers, source_providers, current_path, True) + + self.content: ContentLayout = content + + def create_collection_layouts(self) -> list[ContentLayout]: + """ + Return a list of collection layouts, one for each collection in the same collection root as the current collection layout. + An empty list is returned if the current content layout is not a collection layout. + """ + layout = self.content + collection = layout.collection + + if not collection: + return [] + + root_path = os.path.join(collection.root, 'ansible_collections') + display.info('Scanning collection root: %s' % root_path, verbosity=1) + namespace_names = sorted(name for name in os.listdir(root_path) if os.path.isdir(os.path.join(root_path, name))) + collections = [] + + for namespace_name in namespace_names: + namespace_path = os.path.join(root_path, namespace_name) + collection_names = sorted(name for name in os.listdir(namespace_path) if os.path.isdir(os.path.join(namespace_path, name))) + + for collection_name in collection_names: + collection_path = os.path.join(namespace_path, collection_name) + + if collection_path == os.path.join(collection.root, collection.directory): + collection_layout = layout + else: + collection_layout = self.__create_content_layout(self.__layout_providers, self.__source_providers, collection_path, False) + + file_count = len(collection_layout.all_files()) + + if not file_count: + continue + + display.info('Including collection: %s (%d files)' % (collection_layout.collection.full_name, file_count), verbosity=1) + collections.append(collection_layout) + + return collections + + @staticmethod + def __create_content_layout(layout_providers: list[t.Type[LayoutProvider]], + source_providers: list[t.Type[SourceProvider]], + root: str, + walk: bool, + ) -> ContentLayout: + """Create a content layout using the given providers and root path.""" + try: + layout_provider = find_path_provider(LayoutProvider, layout_providers, root, walk) + except ProviderNotFoundForPath: + layout_provider = UnsupportedLayout(root) + + try: + # Begin the search for the source provider at the layout provider root. + # This intentionally ignores version control within subdirectories of the layout root, a condition which was previously an error. + # Doing so allows support for older git versions for which it is difficult to distinguish between a super project and a sub project. + # It also provides a better user experience, since the solution for the user would effectively be the same -- to remove the nested version control. + if isinstance(layout_provider, UnsupportedLayout): + source_provider: SourceProvider = UnsupportedSource(layout_provider.root) + else: + source_provider = find_path_provider(SourceProvider, source_providers, layout_provider.root, walk) + except ProviderNotFoundForPath: + source_provider = UnversionedSource(layout_provider.root) + + layout = layout_provider.create(layout_provider.root, source_provider.get_paths(layout_provider.root)) + + return layout + + def __create_ansible_source(self): + """Return a tuple of Ansible source files with both absolute and relative paths.""" + if not ANSIBLE_SOURCE_ROOT: + sources = [] + + source_provider = InstalledSource(ANSIBLE_LIB_ROOT) + sources.extend((os.path.join(source_provider.root, path), os.path.join('lib', 'ansible', path)) + for path in source_provider.get_paths(source_provider.root)) + + source_provider = InstalledSource(ANSIBLE_TEST_ROOT) + sources.extend((os.path.join(source_provider.root, path), os.path.join('test', 'lib', 'ansible_test', path)) + for path in source_provider.get_paths(source_provider.root)) + + return tuple(sources) + + if self.content.is_ansible: + return tuple((os.path.join(self.content.root, path), path) for path in self.content.all_files()) + + try: + source_provider = find_path_provider(SourceProvider, self.__source_providers, ANSIBLE_SOURCE_ROOT, False) + except ProviderNotFoundForPath: + source_provider = UnversionedSource(ANSIBLE_SOURCE_ROOT) + + return tuple((os.path.join(source_provider.root, path), path) for path in source_provider.get_paths(source_provider.root)) + + @property + def ansible_source(self) -> tuple[tuple[str, str], ...]: + """Return a tuple of Ansible source files with both absolute and relative paths.""" + if not self.__ansible_source: + self.__ansible_source = self.__create_ansible_source() + + return self.__ansible_source + + def register_payload_callback(self, callback: c.Callable[[list[tuple[str, str]]], None]) -> None: + """Register the given payload callback.""" + self.payload_callbacks.append(callback) + + def check_layout(self) -> None: + """Report an error if the layout is unsupported.""" + if self.content.unsupported: + raise ApplicationError(self.explain_working_directory()) + + def explain_working_directory(self) -> str: + """Return a message explaining the working directory requirements.""" + blocks = [ + 'The current working directory must be within the source tree being tested.', + '', + ] + + if ANSIBLE_SOURCE_ROOT: + blocks.append(f'Testing Ansible: {ANSIBLE_SOURCE_ROOT}/') + blocks.append('') + + cwd = os.getcwd() + + blocks.append('Testing an Ansible collection: {...}/ansible_collections/{namespace}/{collection}/') + blocks.append('Example #1: community.general -> ~/code/ansible_collections/community/general/') + blocks.append('Example #2: ansible.util -> ~/.ansible/collections/ansible_collections/ansible/util/') + blocks.append('') + blocks.append(f'Current working directory: {cwd}/') + + if os.path.basename(os.path.dirname(cwd)) == 'ansible_collections': + blocks.append(f'Expected parent directory: {os.path.dirname(cwd)}/{{namespace}}/{{collection}}/') + elif os.path.basename(cwd) == 'ansible_collections': + blocks.append(f'Expected parent directory: {cwd}/{{namespace}}/{{collection}}/') + elif 'ansible_collections' not in cwd.split(os.path.sep): + blocks.append('No "ansible_collections" parent directory was found.') + + if self.content.collection: + if not is_valid_identifier(self.content.collection.namespace): + blocks.append(f'The namespace "{self.content.collection.namespace}" is an invalid identifier or a reserved keyword.') + + if not is_valid_identifier(self.content.collection.name): + blocks.append(f'The name "{self.content.collection.name}" is an invalid identifier or a reserved keyword.') + + message = '\n'.join(blocks) + + return message + + +@cache +def data_context() -> DataContext: + """Initialize provider plugins.""" + provider_types = ( + 'layout', + 'source', + ) + + for provider_type in provider_types: + import_plugins('provider/%s' % provider_type) + + context = DataContext() + + return context + + +@dataclasses.dataclass(frozen=True) +class PluginInfo: + """Information about an Ansible plugin.""" + plugin_type: str + name: str + paths: list[str] + + +@cache +def content_plugins() -> dict[str, dict[str, PluginInfo]]: + """ + Analyze content. + The primary purpose of this analysis is to facilitate mapping of integration tests to the plugin(s) they are intended to test. + """ + plugins: dict[str, dict[str, PluginInfo]] = {} + + for plugin_type, plugin_directory in data_context().content.plugin_paths.items(): + plugin_paths = sorted(data_context().content.walk_files(plugin_directory)) + plugin_directory_offset = len(plugin_directory.split(os.path.sep)) + + plugin_files: dict[str, list[str]] = {} + + for plugin_path in plugin_paths: + plugin_filename = os.path.basename(plugin_path) + plugin_parts = plugin_path.split(os.path.sep)[plugin_directory_offset:-1] + + if plugin_filename == '__init__.py': + if plugin_type != 'module_utils': + continue + else: + plugin_name = os.path.splitext(plugin_filename)[0] + + if data_context().content.is_ansible and plugin_type == 'modules': + plugin_name = plugin_name.lstrip('_') + + plugin_parts.append(plugin_name) + + plugin_name = '.'.join(plugin_parts) + + plugin_files.setdefault(plugin_name, []).append(plugin_filename) + + plugins[plugin_type] = {plugin_name: PluginInfo( + plugin_type=plugin_type, + name=plugin_name, + paths=paths, + ) for plugin_name, paths in plugin_files.items()} + + return plugins |