summaryrefslogtreecommitdiffstats
path: root/src/ansiblelint/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ansiblelint/utils.py')
-rw-r--r--src/ansiblelint/utils.py511
1 files changed, 304 insertions, 207 deletions
diff --git a/src/ansiblelint/utils.py b/src/ansiblelint/utils.py
index 9cb97aa..3d0e535 100644
--- a/src/ansiblelint/utils.py
+++ b/src/ansiblelint/utils.py
@@ -22,26 +22,35 @@
"""Generic utility helpers."""
from __future__ import annotations
+import ast
import contextlib
import inspect
import logging
import os
import re
-from collections.abc import Generator, ItemsView, Iterator, Mapping, Sequence
+from collections.abc import ItemsView, Iterable, Iterator, Mapping, Sequence
from dataclasses import _MISSING_TYPE, dataclass, field
-from functools import cache
+from functools import cache, lru_cache
from pathlib import Path
-from typing import Any
+from typing import TYPE_CHECKING, Any
+import ruamel.yaml.parser
import yaml
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.dataloader import DataLoader
from ansible.parsing.mod_args import ModuleArgsParser
+from ansible.parsing.plugin_docs import read_docstring
+from ansible.parsing.splitter import split_args
from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence
-from ansible.plugins.loader import add_all_plugin_dirs
+from ansible.plugins.loader import (
+ PluginLoadContext,
+ action_loader,
+ add_all_plugin_dirs,
+ module_loader,
+)
from ansible.template import Templar
from ansible.utils.collection_loader import AnsibleCollectionConfig
from yaml.composer import Composer
@@ -51,7 +60,7 @@ from ansiblelint._internal.rules import (
AnsibleParserErrorRule,
RuntimeErrorRule,
)
-from ansiblelint.app import get_app
+from ansiblelint.app import App, get_app
from ansiblelint.config import Options, options
from ansiblelint.constants import (
ANNOTATION_KEYS,
@@ -67,8 +76,10 @@ from ansiblelint.constants import (
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable, discover_lintables
from ansiblelint.skip_utils import is_nested_task
-from ansiblelint.text import removeprefix
+from ansiblelint.text import has_jinja, removeprefix
+if TYPE_CHECKING:
+ from ansiblelint.rules import RulesCollection
# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a
# string as the password to enable such yaml files to be opened and parsed
# successfully.
@@ -164,7 +175,6 @@ def ansible_template(
for _i in range(10):
try:
templated = templar.template(varname, **kwargs)
- return templated
except AnsibleError as exc:
if lookup_error in exc.message:
return varname
@@ -186,16 +196,14 @@ def ansible_template(
_logger.warning(err)
raise
- # pylint: disable=protected-access
- templar.environment.filters._delegatee[ # noqa: SLF001
- missing_filter
- ] = mock_filter
+ templar.environment.filters._delegatee[missing_filter] = mock_filter # fmt: skip # noqa: SLF001
# Record the mocked filter so we can warn the user
if missing_filter not in options.mock_filters:
_logger.debug("Mocking missing filter %s", missing_filter)
options.mock_filters.append(missing_filter)
continue
raise
+ return templated
return None
@@ -210,26 +218,23 @@ BLOCK_NAME_TO_ACTION_TYPE_MAP = {
}
-def tokenize(line: str) -> tuple[str, list[str], dict[str, str]]:
+def tokenize(value: str) -> tuple[list[str], dict[str, str]]:
"""Parse a string task invocation."""
- tokens = line.lstrip().split(" ")
- if tokens[0] == "-":
- tokens = tokens[1:]
- if tokens[0] == "action:" or tokens[0] == "local_action:":
- tokens = tokens[1:]
- command = tokens[0].replace(":", "")
-
- args = []
- kwargs = {}
- non_kv_found = False
- for arg in tokens[1:]:
- if "=" in arg and not non_kv_found:
- key_value = arg.split("=", 1)
- kwargs[key_value[0]] = key_value[1]
+ # We do not try to tokenize something very simple because it would fail to
+ # work for a case like: task_include: path with space.yml
+ if value and "=" not in value:
+ return ([value], {})
+
+ parts = split_args(value)
+ args: list[str] = []
+ kwargs: dict[str, str] = {}
+ for part in parts:
+ if "=" not in part:
+ args.append(part)
else:
- non_kv_found = True
- args.append(arg)
- return (command, args, kwargs)
+ k, v = part.split("=", 1)
+ kwargs[k] = v
+ return (args, kwargs)
def playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore[type-arg]
@@ -278,106 +283,179 @@ def template(
return value
-def _include_children(
- basedir: str,
- k: str,
- v: Any,
- parent_type: FileType,
-) -> list[Lintable]:
- # handle special case include_tasks: name=filename.yml
- if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v:
- v = v["file"]
-
- # we cannot really parse any jinja2 in includes, so we ignore them
- if not v or "{{" in v:
- return []
-
- if "import_playbook" in k and COLLECTION_PLAY_RE.match(v):
- # Any import_playbooks from collections should be ignored as ansible
- # own syntax check will handle them.
- return []
-
- # handle include: filename.yml tags=blah
- # pylint: disable=unused-variable
- (command, args, kwargs) = tokenize(f"{k}: {v}")
-
- result = path_dwim(basedir, args[0])
- while basedir not in ["", "/"]:
- if os.path.exists(result):
- break
- basedir = os.path.dirname(basedir)
- result = path_dwim(basedir, args[0])
-
- return [Lintable(result, kind=parent_type)]
-
-
-def _taskshandlers_children(
- basedir: str,
- k: str,
- v: None | Any,
- parent_type: FileType,
-) -> list[Lintable]:
- results: list[Lintable] = []
- if v is None:
- raise MatchError(
- message="A malformed block was encountered while loading a block.",
- rule=RuntimeErrorRule(),
- )
- for task_handler in v:
- # ignore empty tasks, `-`
- if not task_handler:
- continue
+@dataclass
+class HandleChildren:
+ """Parse task, roles and children."""
+
+ rules: RulesCollection = field(init=True, repr=False)
+ app: App
+
+ def include_children( # pylint: disable=too-many-return-statements
+ self,
+ lintable: Lintable,
+ k: str,
+ v: Any,
+ parent_type: FileType,
+ ) -> list[Lintable]:
+ """Include children."""
+ basedir = str(lintable.path.parent)
+ # import_playbook only accepts a string as argument (no dict syntax)
+ if k in (
+ "import_playbook",
+ "ansible.builtin.import_playbook",
+ ) and not isinstance(v, str):
+ return []
+
+ # handle special case include_tasks: name=filename.yml
+ if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v:
+ v = v["file"]
+
+ # we cannot really parse any jinja2 in includes, so we ignore them
+ if not v or "{{" in v:
+ return []
+
+ if k in ("import_playbook", "ansible.builtin.import_playbook"):
+ included = Path(basedir) / v
+ if self.app.runtime.has_playbook(v, basedir=Path(basedir)):
+ if included.exists():
+ return [Lintable(included, kind=parent_type)]
+ return []
+ msg = f"Failed to find {v} playbook."
+ logging.error(msg)
+ return []
+
+ # handle include: filename.yml tags=blah
+ (args, kwargs) = tokenize(v)
+
+ if args:
+ file = args[0]
+ elif "file" in kwargs:
+ file = kwargs["file"]
+ else:
+ return []
- with contextlib.suppress(LookupError):
- children = _get_task_handler_children_for_tasks_or_playbooks(
- task_handler,
- basedir,
- k,
- parent_type,
+ result = path_dwim(basedir, file)
+ while basedir not in ["", "/"]:
+ if os.path.exists(result):
+ break
+ basedir = os.path.dirname(basedir)
+ result = path_dwim(basedir, file)
+
+ return [Lintable(result, kind=parent_type)]
+
+ def taskshandlers_children(
+ self,
+ lintable: Lintable,
+ k: str,
+ v: None | Any,
+ parent_type: FileType,
+ ) -> list[Lintable]:
+ """TasksHandlers Children."""
+ basedir = str(lintable.path.parent)
+ results: list[Lintable] = []
+ if v is None or isinstance(v, int | str):
+ raise MatchError(
+ message="A malformed block was encountered while loading a block.",
+ rule=RuntimeErrorRule(),
)
- results.append(children)
- continue
+ for task_handler in v:
+ # ignore empty tasks, `-`
+ if not task_handler:
+ continue
- if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES):
- task_handler = normalize_task_v2(task_handler)
- _validate_task_handler_action_for_role(task_handler["action"])
- results.extend(
- _roles_children(
+ with contextlib.suppress(LookupError):
+ children = _get_task_handler_children_for_tasks_or_playbooks(
+ task_handler,
basedir,
k,
- [task_handler["action"].get("name")],
parent_type,
- main=task_handler["action"].get("tasks_from", "main"),
- ),
- )
- continue
+ )
+ results.append(children)
+ continue
- if "block" not in task_handler:
- continue
+ if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES):
+ task_handler = normalize_task_v2(
+ Task(task_handler, filename=str(lintable.path)),
+ )
+ self._validate_task_handler_action_for_role(task_handler["action"])
+ name = task_handler["action"].get("name")
+ if has_jinja(name):
+ # we cannot deal with dynamic imports
+ continue
+ results.extend(
+ self.roles_children(lintable, k, [name], parent_type),
+ )
+ continue
- results.extend(
- _taskshandlers_children(basedir, k, task_handler["block"], parent_type),
- )
- if "rescue" in task_handler:
- results.extend(
- _taskshandlers_children(
- basedir,
- k,
- task_handler["rescue"],
- parent_type,
+ if "block" not in task_handler:
+ continue
+
+ for elem in ("block", "rescue", "always"):
+ if elem in task_handler:
+ results.extend(
+ self.taskshandlers_children(
+ lintable,
+ k,
+ task_handler[elem],
+ parent_type,
+ ),
+ )
+
+ return results
+
+ def _validate_task_handler_action_for_role(self, th_action: dict[str, Any]) -> None:
+ """Verify that the task handler action is valid for role include."""
+ module = th_action["__ansible_module__"]
+
+ if "name" not in th_action:
+ raise MatchError(
+ message=f"Failed to find required 'name' key in {module!s}",
+ rule=self.rules.rules[0],
+ lintable=Lintable(
+ (
+ self.rules.options.lintables[0]
+ if self.rules.options.lintables
+ else "."
+ ),
),
)
- if "always" in task_handler:
- results.extend(
- _taskshandlers_children(
- basedir,
- k,
- task_handler["always"],
- parent_type,
- ),
+
+ if not isinstance(th_action["name"], str):
+ raise MatchError(
+ message=f"Value assigned to 'name' key on '{module!s}' is not a string.",
+ rule=self.rules.rules[1],
)
- return results
+ def roles_children(
+ self,
+ lintable: Lintable,
+ k: str,
+ v: Sequence[Any],
+ parent_type: FileType,
+ ) -> list[Lintable]:
+ """Roles children."""
+ # pylint: disable=unused-argument # parent_type)
+ basedir = str(lintable.path.parent)
+ results: list[Lintable] = []
+ if not v or not isinstance(v, Iterable):
+ # typing does not prevent junk from being passed in
+ return results
+ for role in v:
+ if isinstance(role, dict):
+ if "role" in role or "name" in role:
+ if "tags" not in role or "skip_ansible_lint" not in role["tags"]:
+ results.extend(
+ _look_for_role_files(
+ basedir,
+ role.get("role", role.get("name")),
+ ),
+ )
+ elif k != "dependencies":
+ msg = f'role dict {role} does not contain a "role" or "name" key'
+ raise SystemExit(msg)
+ else:
+ results.extend(_look_for_role_files(basedir, role))
+ return results
def _get_task_handler_children_for_tasks_or_playbooks(
@@ -393,13 +471,27 @@ def _get_task_handler_children_for_tasks_or_playbooks(
for task_handler_key in INCLUSION_ACTION_NAMES:
with contextlib.suppress(KeyError):
# ignore empty tasks
- if not task_handler: # pragma: no branch
+ if not task_handler or isinstance(task_handler, str): # pragma: no branch
continue
- file_name = task_handler[task_handler_key]
- if isinstance(file_name, Mapping) and file_name.get("file", None):
- file_name = file_name["file"]
+ file_name = ""
+ action_args = task_handler[task_handler_key]
+ if isinstance(action_args, str):
+ (args, kwargs) = tokenize(action_args)
+ if len(args) == 1:
+ file_name = args[0]
+ elif kwargs.get("file", None):
+ file_name = kwargs["file"]
+ else:
+ # ignore invalid data (syntax check will outside the scope)
+ continue
+
+ if isinstance(action_args, Mapping) and action_args.get("file", None):
+ file_name = action_args["file"]
+ if not file_name:
+ # ignore invalid data (syntax check will outside the scope)
+ continue
f = path_dwim(basedir, file_name)
while basedir not in ["", "/"]:
if os.path.exists(f):
@@ -411,50 +503,6 @@ def _get_task_handler_children_for_tasks_or_playbooks(
raise LookupError(msg)
-def _validate_task_handler_action_for_role(th_action: dict[str, Any]) -> None:
- """Verify that the task handler action is valid for role include."""
- module = th_action["__ansible_module__"]
-
- if "name" not in th_action:
- raise MatchError(message=f"Failed to find required 'name' key in {module!s}")
-
- if not isinstance(th_action["name"], str):
- raise MatchError(
- message=f"Value assigned to 'name' key on '{module!s}' is not a string.",
- )
-
-
-def _roles_children(
- basedir: str,
- k: str,
- v: Sequence[Any],
- parent_type: FileType, # noqa: ARG001
- main: str = "main",
-) -> list[Lintable]:
- # pylint: disable=unused-argument # parent_type)
- results: list[Lintable] = []
- if not v:
- # typing does not prevent junk from being passed in
- return results
- for role in v:
- if isinstance(role, dict):
- if "role" in role or "name" in role:
- if "tags" not in role or "skip_ansible_lint" not in role["tags"]:
- results.extend(
- _look_for_role_files(
- basedir,
- role.get("role", role.get("name")),
- main=main,
- ),
- )
- elif k != "dependencies":
- msg = f'role dict {role} does not contain a "role" or "name" key'
- raise SystemExit(msg)
- else:
- results.extend(_look_for_role_files(basedir, role, main=main))
- return results
-
-
def _rolepath(basedir: str, role: str) -> str | None:
role_path = None
@@ -469,7 +517,7 @@ def _rolepath(basedir: str, role: str) -> str | None:
path_dwim(basedir, os.path.join("..", role)),
]
- for loc in get_app(offline=True).runtime.config.default_roles_path:
+ for loc in get_app(cached=True).runtime.config.default_roles_path:
loc = os.path.expanduser(loc)
possible_paths.append(path_dwim(loc, role))
@@ -486,12 +534,7 @@ def _rolepath(basedir: str, role: str) -> str | None:
return role_path
-def _look_for_role_files(
- basedir: str,
- role: str,
- main: str | None = "main", # noqa: ARG001
-) -> list[Lintable]:
- # pylint: disable=unused-argument # main
+def _look_for_role_files(basedir: str, role: str) -> list[Lintable]:
role_path = _rolepath(basedir, role)
if not role_path: # pragma: no branch
return []
@@ -539,13 +582,14 @@ def _extract_ansible_parsed_keys_from_task(
return result
-def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
+def normalize_task_v2(task: Task) -> dict[str, Any]:
"""Ensure tasks have a normalized action key and strings are converted to python objects."""
+ raw_task = task.raw_task
result: dict[str, Any] = {}
ansible_parsed_keys = ("action", "local_action", "args", "delegate_to")
- if is_nested_task(task):
- _extract_ansible_parsed_keys_from_task(result, task, ansible_parsed_keys)
+ if is_nested_task(raw_task):
+ _extract_ansible_parsed_keys_from_task(result, raw_task, ansible_parsed_keys)
# Add dummy action for block/always/rescue statements
result["action"] = {
"__ansible_module__": "block/always/rescue",
@@ -554,7 +598,7 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
return result
- sanitized_task = _sanitize_task(task)
+ sanitized_task = _sanitize_task(raw_task)
mod_arg_parser = ModuleArgsParser(sanitized_task)
try:
@@ -562,12 +606,11 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
skip_action_validation=options.skip_action_validation,
)
except AnsibleParserError as exc:
- # pylint: disable=raise-missing-from
raise MatchError(
rule=AnsibleParserErrorRule(),
message=exc.message,
- filename=task.get(FILENAME_KEY, "Unknown"),
- lineno=task.get(LINE_NUMBER_KEY, 0),
+ lintable=Lintable(task.filename or ""),
+ lineno=raw_task.get(LINE_NUMBER_KEY, 1),
) from exc
# denormalize shell -> command conversion
@@ -577,13 +620,13 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
_extract_ansible_parsed_keys_from_task(
result,
- task,
+ raw_task,
(*ansible_parsed_keys, action),
)
if not isinstance(action, str):
msg = f"Task actions can only be strings, got {action}"
- raise RuntimeError(msg)
+ raise TypeError(msg)
action_unnormalized = action
# convert builtin fqn calls to short forms because most rules know only
# about short calls but in the future we may switch the normalization to do
@@ -599,17 +642,6 @@ def normalize_task_v2(task: dict[str, Any]) -> dict[str, Any]:
return result
-def normalize_task(task: dict[str, Any], filename: str) -> dict[str, Any]:
- """Unify task-like object structures."""
- ansible_action_type = task.get("__ansible_action_type__", "task")
- if "__ansible_action_type__" in task:
- del task["__ansible_action_type__"]
- task = normalize_task_v2(task)
- task[FILENAME_KEY] = filename
- task["__ansible_action_type__"] = ansible_action_type
- return task
-
-
def task_to_str(task: dict[str, Any]) -> str:
"""Make a string identifier for the given task."""
name = task.get("name")
@@ -634,7 +666,7 @@ def task_to_str(task: dict[str, Any]) -> str:
_raw_params = action.get("_raw_params", [])
if isinstance(_raw_params, list):
for item in _raw_params:
- args.append(str(item))
+ args.extend(str(item))
else:
args.append(_raw_params)
@@ -698,7 +730,11 @@ class Task(dict[str, Any]):
@property
def name(self) -> str | None:
"""Return the name of the task."""
- return self.raw_task.get("name", None)
+ name = self.raw_task.get("name", None)
+ if name is not None and not isinstance(name, str):
+ msg = "Task name can only be a string."
+ raise RuntimeError(msg)
+ return name
@property
def action(self) -> str:
@@ -706,7 +742,7 @@ class Task(dict[str, Any]):
action_name = self.normalized_task["action"]["__ansible_module_original__"]
if not isinstance(action_name, str):
msg = "Task actions can only be strings."
- raise RuntimeError(msg)
+ raise TypeError(msg)
return action_name
@property
@@ -729,10 +765,7 @@ class Task(dict[str, Any]):
"""Return the name of the task."""
if not hasattr(self, "_normalized_task"):
try:
- self._normalized_task = normalize_task(
- self.raw_task,
- filename=self.filename,
- )
+ self._normalized_task = self._normalize_task()
except MatchError as err:
self.error = err
# When we cannot normalize it, we just use the raw task instead
@@ -740,15 +773,35 @@ class Task(dict[str, Any]):
self._normalized_task = self.raw_task
if isinstance(self._normalized_task, _MISSING_TYPE):
msg = "Task was not normalized"
- raise RuntimeError(msg)
+ raise TypeError(msg)
return self._normalized_task
+ def _normalize_task(self) -> dict[str, Any]:
+ """Unify task-like object structures."""
+ ansible_action_type = self.raw_task.get("__ansible_action_type__", "task")
+ if "__ansible_action_type__" in self.raw_task:
+ del self.raw_task["__ansible_action_type__"]
+ task = normalize_task_v2(self)
+ task[FILENAME_KEY] = self.filename
+ task["__ansible_action_type__"] = ansible_action_type
+ return task
+
@property
def skip_tags(self) -> list[str]:
"""Return the list of tags to skip."""
skip_tags: list[str] = self.raw_task.get(SKIPPED_RULES_KEY, [])
return skip_tags
+ def is_handler(self) -> bool:
+ """Return true for tasks that are handlers."""
+ is_handler_file = False
+ if isinstance(self._normalized_task, dict):
+ file_name = str(self._normalized_task["action"].get(FILENAME_KEY, None))
+ if file_name:
+ paths = file_name.split("/")
+ is_handler_file = "handlers" in paths
+ return is_handler_file if is_handler_file else ".handlers[" in self.position
+
def __repr__(self) -> str:
"""Return a string representation of the task."""
return f"Task('{self.name}' [{self.position}])"
@@ -761,7 +814,7 @@ class Task(dict[str, Any]):
"""Allow access as task[...]."""
return self.normalized_task[index]
- def __iter__(self) -> Generator[str, None, None]:
+ def __iter__(self) -> Iterator[str]:
"""Provide support for 'key in task'."""
yield from (f for f in self.normalized_task)
@@ -857,7 +910,7 @@ def parse_yaml_linenumbers(
node = Composer.compose_node(loader, parent, index)
if not isinstance(node, yaml.nodes.Node):
msg = "Unexpected yaml data."
- raise RuntimeError(msg)
+ raise TypeError(msg)
node.__line__ = line + 1 # type: ignore[attr-defined]
return node
@@ -870,9 +923,7 @@ def parse_yaml_linenumbers(
if hasattr(node, "__line__"):
mapping[LINE_NUMBER_KEY] = node.__line__
else:
- mapping[
- LINE_NUMBER_KEY
- ] = mapping._line_number # pylint: disable=protected-access # noqa: SLF001
+ mapping[LINE_NUMBER_KEY] = mapping._line_number # noqa: SLF001
mapping[FILENAME_KEY] = lintable.path
return mapping
@@ -895,8 +946,9 @@ def parse_yaml_linenumbers(
yaml.parser.ParserError,
yaml.scanner.ScannerError,
yaml.constructor.ConstructorError,
+ ruamel.yaml.parser.ParserError,
) as exc:
- msg = "Failed to load YAML file"
+ msg = f"Failed to load YAML file: {lintable.path}"
raise RuntimeError(msg) from exc
if len(result) == 0:
@@ -975,7 +1027,6 @@ def is_playbook(filename: str) -> bool:
return False
-# pylint: disable=too-many-statements
def get_lintables(
opts: Options = options,
args: list[str] | None = None,
@@ -1018,3 +1069,49 @@ def _extend_with_roles(lintables: list[Lintable]) -> None:
def convert_to_boolean(value: Any) -> bool:
"""Use Ansible to convert something to a boolean."""
return bool(boolean(value))
+
+
+def parse_examples_from_plugin(lintable: Lintable) -> tuple[int, str]:
+ """Parse yaml inside plugin EXAMPLES string.
+
+ Store a line number offset to realign returned line numbers later
+ """
+ offset = 1
+ parsed = ast.parse(lintable.content)
+ for child in parsed.body:
+ if isinstance(child, ast.Assign):
+ label = child.targets[0]
+ if isinstance(label, ast.Name) and label.id == "EXAMPLES":
+ offset = child.lineno - 1
+ break
+
+ docs = read_docstring(str(lintable.path))
+ examples = docs["plainexamples"]
+
+ # Ignore the leading newline and lack of document start
+ # as including those in EXAMPLES would be weird.
+ return offset, (f"---{examples}" if examples else "")
+
+
+@lru_cache
+def load_plugin(name: str) -> PluginLoadContext:
+ """Return loaded ansible plugin/module."""
+ loaded_module = action_loader.find_plugin_with_context(
+ name,
+ ignore_deprecated=True,
+ check_aliases=True,
+ )
+ if not loaded_module.resolved:
+ loaded_module = module_loader.find_plugin_with_context(
+ name,
+ ignore_deprecated=True,
+ check_aliases=True,
+ )
+ if not loaded_module.resolved and name.startswith("ansible.builtin."):
+ # fallback to core behavior of using legacy
+ loaded_module = module_loader.find_plugin_with_context(
+ name.replace("ansible.builtin.", "ansible.legacy."),
+ ignore_deprecated=True,
+ check_aliases=True,
+ )
+ return loaded_module