From 8a754e0858d922e955e71b253c139e071ecec432 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 18:04:21 +0200 Subject: Adding upstream version 2.14.3. Signed-off-by: Daniel Baumann --- lib/ansible/plugins/list.py | 210 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 lib/ansible/plugins/list.py (limited to 'lib/ansible/plugins/list.py') diff --git a/lib/ansible/plugins/list.py b/lib/ansible/plugins/list.py new file mode 100644 index 0000000..e09b293 --- /dev/null +++ b/lib/ansible/plugins/list.py @@ -0,0 +1,210 @@ +# (c) Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import os + +from ansible import context +from ansible import constants as C +from ansible.collections.list import list_collections +from ansible.errors import AnsibleError +from ansible.module_utils._text import to_native, to_bytes +from ansible.plugins import loader +from ansible.utils.display import Display +from ansible.utils.collection_loader._collection_finder import _get_collection_path, AnsibleCollectionRef + +display = Display() + +# not real plugins +IGNORE = { + # ptype: names + 'module': ('async_wrapper', ), + 'cache': ('base', ), +} + + +def get_composite_name(collection, name, path, depth): + resolved_collection = collection + if '.' not in name: + resource_name = name + else: + if collection == 'ansible.legacy' and name.startswith('ansible.builtin.'): + resolved_collection = 'ansible.builtin' + resource_name = '.'.join(name.split(f"{resolved_collection}.")[1:]) + + # collectionize name + composite = [resolved_collection] + if depth: + composite.extend(path.split(os.path.sep)[depth * -1:]) + composite.append(to_native(resource_name)) + return '.'.join(composite) + + +def _list_plugins_from_paths(ptype, dirs, collection, depth=0): + + plugins = {} + + for path in dirs: + display.debug("Searching '{0}'s '{1}' for {2} plugins".format(collection, path, ptype)) + b_path = to_bytes(path) + + if os.path.basename(b_path).startswith((b'.', b'__')): + # skip hidden/special dirs + continue + + if os.path.exists(b_path): + if os.path.isdir(b_path): + bkey = ptype.lower() + for plugin_file in os.listdir(b_path): + + if plugin_file.startswith((b'.', b'__')): + # hidden or python internal file/dir + continue + + display.debug("Found possible plugin: '{0}'".format(plugin_file)) + b_plugin, b_ext = os.path.splitext(plugin_file) + plugin = to_native(b_plugin) + full_path = os.path.join(b_path, plugin_file) + + if os.path.isdir(full_path): + # its a dir, recurse + if collection in C.SYNTHETIC_COLLECTIONS: + if not os.path.exists(os.path.join(full_path, b'__init__.py')): + # dont recurse for synthetic unless init.py present + continue + + # actually recurse dirs + plugins.update(_list_plugins_from_paths(ptype, [to_native(full_path)], collection, depth=depth + 1)) + else: + if any([ + plugin in C.IGNORE_FILES, # general files to ignore + to_native(b_ext) in C.REJECT_EXTS, # general extensions to ignore + b_ext in (b'.yml', b'.yaml', b'.json'), # ignore docs files TODO: constant! + plugin in IGNORE.get(bkey, ()), # plugin in reject list + os.path.islink(full_path), # skip aliases, author should document in 'aliaes' field + ]): + continue + + if ptype in ('test', 'filter'): + try: + file_plugins = _list_j2_plugins_from_file(collection, full_path, ptype, plugin) + except KeyError as e: + display.warning('Skipping file %s: %s' % (full_path, to_native(e))) + continue + + for plugin in file_plugins: + plugin_name = get_composite_name(collection, plugin.ansible_name, os.path.dirname(to_native(full_path)), depth) + plugins[plugin_name] = full_path + else: + plugin_name = get_composite_name(collection, plugin, os.path.dirname(to_native(full_path)), depth) + plugins[plugin_name] = full_path + else: + display.debug("Skip listing plugins in '{0}' as it is not a directory".format(path)) + else: + display.debug("Skip listing plugins in '{0}' as it does not exist".format(path)) + + return plugins + + +def _list_j2_plugins_from_file(collection, plugin_path, ptype, plugin_name): + + ploader = getattr(loader, '{0}_loader'.format(ptype)) + file_plugins = ploader.get_contained_plugins(collection, plugin_path, plugin_name) + return file_plugins + + +def list_collection_plugins(ptype, collections, search_paths=None): + + # starts at {plugin_name: filepath, ...}, but changes at the end + plugins = {} + try: + ploader = getattr(loader, '{0}_loader'.format(ptype)) + except AttributeError: + raise AnsibleError('Cannot list plugins, incorrect plugin type supplied: {0}'.format(ptype)) + + # get plugins for each collection + for collection in collections.keys(): + if collection == 'ansible.builtin': + # dirs from ansible install, but not configured paths + dirs = [d.path for d in ploader._get_paths_with_context() if d.internal] + elif collection == 'ansible.legacy': + # configured paths + search paths (should include basedirs/-M) + dirs = [d.path for d in ploader._get_paths_with_context() if not d.internal] + if context.CLIARGS.get('module_path', None): + dirs.extend(context.CLIARGS['module_path']) + else: + # search path in this case is for locating collection itselfA + b_ptype = to_bytes(C.COLLECTION_PTYPE_COMPAT.get(ptype, ptype)) + dirs = [to_native(os.path.join(collections[collection], b'plugins', b_ptype))] + # acr = AnsibleCollectionRef.try_parse_fqcr(collection, ptype) + # if acr: + # dirs = acr.subdirs + # else: + + # raise Exception('bad acr for %s, %s' % (collection, ptype)) + + plugins.update(_list_plugins_from_paths(ptype, dirs, collection)) + + # return plugin and it's class object, None for those not verifiable or failing + if ptype in ('module',): + # no 'invalid' tests for modules + for plugin in plugins.keys(): + plugins[plugin] = (plugins[plugin], None) + else: + # detect invalid plugin candidates AND add loaded object to return data + for plugin in list(plugins.keys()): + pobj = None + try: + pobj = ploader.get(plugin, class_only=True) + except Exception as e: + display.vvv("The '{0}' {1} plugin could not be loaded from '{2}': {3}".format(plugin, ptype, plugins[plugin], to_native(e))) + + # sets final {plugin_name: (filepath, class|NONE if not loaded), ...} + plugins[plugin] = (plugins[plugin], pobj) + + # {plugin_name: (filepath, class), ...} + return plugins + + +def list_plugins(ptype, collection=None, search_paths=None): + + # {plugin_name: (filepath, class), ...} + plugins = {} + collections = {} + if collection is None: + # list all collections, add synthetic ones + collections['ansible.builtin'] = b'' + collections['ansible.legacy'] = b'' + collections.update(list_collections(search_paths=search_paths, dedupe=True)) + elif collection == 'ansible.legacy': + # add builtin, since legacy also resolves to these + collections[collection] = b'' + collections['ansible.builtin'] = b'' + else: + try: + collections[collection] = to_bytes(_get_collection_path(collection)) + except ValueError as e: + raise AnsibleError("Cannot use supplied collection {0}: {1}".format(collection, to_native(e)), orig_exc=e) + + if collections: + plugins.update(list_collection_plugins(ptype, collections)) + + return plugins + + +# wrappers +def list_plugin_names(ptype, collection=None): + return [plugin.ansible_name for plugin in list_plugins(ptype, collection)] + + +def list_plugin_files(ptype, collection=None): + plugins = list_plugins(ptype, collection) + return [plugins[k][0] for k in plugins.keys()] + + +def list_plugin_classes(ptype, collection=None): + plugins = list_plugins(ptype, collection) + return [plugins[k][1] for k in plugins.keys()] -- cgit v1.2.3