diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:55:42 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:55:42 +0000 |
commit | 62d9962ec7d01c95bf5732169320d3857a41446e (patch) | |
tree | f60d8fc63ff738e5f5afec48a84cf41480ee1315 /lib/ansible/executor | |
parent | Releasing progress-linux version 2.14.13-1~progress7.99u1. (diff) | |
download | ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.tar.xz ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.zip |
Merging upstream version 2.16.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/executor')
-rw-r--r-- | lib/ansible/executor/action_write_locks.py | 6 | ||||
-rw-r--r-- | lib/ansible/executor/interpreter_discovery.py | 2 | ||||
-rw-r--r-- | lib/ansible/executor/module_common.py | 85 | ||||
-rw-r--r-- | lib/ansible/executor/play_iterator.py | 71 | ||||
-rw-r--r-- | lib/ansible/executor/playbook_executor.py | 10 | ||||
-rw-r--r-- | lib/ansible/executor/powershell/async_wrapper.ps1 | 10 | ||||
-rw-r--r-- | lib/ansible/executor/powershell/module_manifest.py | 2 | ||||
-rw-r--r-- | lib/ansible/executor/powershell/module_wrapper.ps1 | 5 | ||||
-rw-r--r-- | lib/ansible/executor/process/worker.py | 54 | ||||
-rw-r--r-- | lib/ansible/executor/task_executor.py | 126 | ||||
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 35 |
11 files changed, 236 insertions, 170 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py index fd82744..d2acae9 100644 --- a/lib/ansible/executor/action_write_locks.py +++ b/lib/ansible/executor/action_write_locks.py @@ -15,9 +15,7 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type +from __future__ import annotations import multiprocessing.synchronize @@ -29,7 +27,7 @@ if 'action_write_locks' not in globals(): # Do not initialize this more than once because it seems to bash # the existing one. multiprocessing must be reloading the module # when it forks? - action_write_locks = dict() # type: dict[str | None, multiprocessing.synchronize.Lock] + action_write_locks: dict[str | None, multiprocessing.synchronize.Lock] = dict() # Below is a Lock for use when we weren't expecting a named module. It gets used when an action # plugin invokes a module whose name does not match with the action's name. Slightly less diff --git a/lib/ansible/executor/interpreter_discovery.py b/lib/ansible/executor/interpreter_discovery.py index bfd8504..c95cf2e 100644 --- a/lib/ansible/executor/interpreter_discovery.py +++ b/lib/ansible/executor/interpreter_discovery.py @@ -10,7 +10,7 @@ import pkgutil import re from ansible import constants as C -from ansible.module_utils._text import to_native, to_text +from ansible.module_utils.common.text.converters import to_native, to_text from ansible.module_utils.distro import LinuxDistribution from ansible.utils.display import Display from ansible.utils.plugin_docs import get_versioned_doclink diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py index 4d06acb..3517543 100644 --- a/lib/ansible/executor/module_common.py +++ b/lib/ansible/executor/module_common.py @@ -26,6 +26,7 @@ import datetime import json import os import shlex +import time import zipfile import re import pkgutil @@ -166,7 +167,7 @@ def _ansiballz_main(): else: PY3 = True - ZIPDATA = """%(zipdata)s""" + ZIPDATA = %(zipdata)r # Note: temp_path isn't needed once we switch to zipimport def invoke_module(modlib_path, temp_path, json_params): @@ -177,13 +178,13 @@ def _ansiballz_main(): z = zipfile.ZipFile(modlib_path, mode='a') # py3: modlib_path will be text, py2: it's bytes. Need bytes at the end - sitecustomize = u'import sys\\nsys.path.insert(0,"%%s")\\n' %% modlib_path + sitecustomize = u'import sys\\nsys.path.insert(0,"%%s")\\n' %% modlib_path sitecustomize = sitecustomize.encode('utf-8') # Use a ZipInfo to work around zipfile limitation on hosts with # clocks set to a pre-1980 year (for instance, Raspberry Pi) zinfo = zipfile.ZipInfo() zinfo.filename = 'sitecustomize.py' - zinfo.date_time = ( %(year)i, %(month)i, %(day)i, %(hour)i, %(minute)i, %(second)i) + zinfo.date_time = %(date_time)s z.writestr(zinfo, sitecustomize) z.close() @@ -196,7 +197,7 @@ def _ansiballz_main(): basic._ANSIBLE_ARGS = json_params %(coverage)s # Run the module! By importing it as '__main__', it thinks it is executing as a script - runpy.run_module(mod_name='%(module_fqn)s', init_globals=dict(_module_fqn='%(module_fqn)s', _modlib_path=modlib_path), + runpy.run_module(mod_name=%(module_fqn)r, init_globals=dict(_module_fqn=%(module_fqn)r, _modlib_path=modlib_path), run_name='__main__', alter_sys=True) # Ansible modules must exit themselves @@ -287,7 +288,7 @@ def _ansiballz_main(): basic._ANSIBLE_ARGS = json_params # Run the module! By importing it as '__main__', it thinks it is executing as a script - runpy.run_module(mod_name='%(module_fqn)s', init_globals=None, run_name='__main__', alter_sys=True) + runpy.run_module(mod_name=%(module_fqn)r, init_globals=None, run_name='__main__', alter_sys=True) # Ansible modules must exit themselves print('{"msg": "New-style module did not handle its own exit", "failed": true}') @@ -312,9 +313,9 @@ def _ansiballz_main(): # store this in remote_tmpdir (use system tempdir instead) # Only need to use [ansible_module]_payload_ in the temp_path until we move to zipimport # (this helps ansible-test produce coverage stats) - temp_path = tempfile.mkdtemp(prefix='ansible_%(ansible_module)s_payload_') + temp_path = tempfile.mkdtemp(prefix='ansible_' + %(ansible_module)r + '_payload_') - zipped_mod = os.path.join(temp_path, 'ansible_%(ansible_module)s_payload.zip') + zipped_mod = os.path.join(temp_path, 'ansible_' + %(ansible_module)r + '_payload.zip') with open(zipped_mod, 'wb') as modlib: modlib.write(base64.b64decode(ZIPDATA)) @@ -337,7 +338,7 @@ if __name__ == '__main__': ''' ANSIBALLZ_COVERAGE_TEMPLATE = ''' - os.environ['COVERAGE_FILE'] = '%(coverage_output)s=python-%%s=coverage' %% '.'.join(str(v) for v in sys.version_info[:2]) + os.environ['COVERAGE_FILE'] = %(coverage_output)r + '=python-%%s=coverage' %% '.'.join(str(v) for v in sys.version_info[:2]) import atexit @@ -347,7 +348,7 @@ ANSIBALLZ_COVERAGE_TEMPLATE = ''' print('{"msg": "Could not import `coverage` module.", "failed": true}') sys.exit(1) - cov = coverage.Coverage(config_file='%(coverage_config)s') + cov = coverage.Coverage(config_file=%(coverage_config)r) def atexit_coverage(): cov.stop() @@ -870,7 +871,17 @@ class CollectionModuleUtilLocator(ModuleUtilLocatorBase): return name_parts[5:] # eg, foo.bar for ansible_collections.ns.coll.plugins.module_utils.foo.bar -def recursive_finder(name, module_fqn, module_data, zf): +def _make_zinfo(filename, date_time, zf=None): + zinfo = zipfile.ZipInfo( + filename=filename, + date_time=date_time + ) + if zf: + zinfo.compress_type = zf.compression + return zinfo + + +def recursive_finder(name, module_fqn, module_data, zf, date_time=None): """ Using ModuleDepFinder, make sure we have all of the module_utils files that the module and its module_utils files needs. (no longer actually recursive) @@ -880,6 +891,8 @@ def recursive_finder(name, module_fqn, module_data, zf): :arg zf: An open :python:class:`zipfile.ZipFile` object that holds the Ansible module payload which we're assembling """ + if date_time is None: + date_time = time.gmtime()[:6] # py_module_cache maps python module names to a tuple of the code in the module # and the pathname to the module. @@ -976,7 +989,10 @@ def recursive_finder(name, module_fqn, module_data, zf): for py_module_name in py_module_cache: py_module_file_name = py_module_cache[py_module_name][1] - zf.writestr(py_module_file_name, py_module_cache[py_module_name][0]) + zf.writestr( + _make_zinfo(py_module_file_name, date_time, zf=zf), + py_module_cache[py_module_name][0] + ) mu_file = to_text(py_module_file_name, errors='surrogate_or_strict') display.vvvvv("Including module_utils file %s" % mu_file) @@ -1020,13 +1036,16 @@ def _get_ansible_module_fqn(module_path): return remote_module_fqn -def _add_module_to_zip(zf, remote_module_fqn, b_module_data): +def _add_module_to_zip(zf, date_time, remote_module_fqn, b_module_data): """Add a module from ansible or from an ansible collection into the module zip""" module_path_parts = remote_module_fqn.split('.') # Write the module module_path = '/'.join(module_path_parts) + '.py' - zf.writestr(module_path, b_module_data) + zf.writestr( + _make_zinfo(module_path, date_time, zf=zf), + b_module_data + ) # Write the __init__.py's necessary to get there if module_path_parts[0] == 'ansible': @@ -1045,7 +1064,10 @@ def _add_module_to_zip(zf, remote_module_fqn, b_module_data): continue # Note: We don't want to include more than one ansible module in a payload at this time # so no need to fill the __init__.py with namespace code - zf.writestr(package_path, b'') + zf.writestr( + _make_zinfo(package_path, date_time, zf=zf), + b'' + ) def _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression, async_timeout, become, @@ -1110,6 +1132,10 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas remote_module_fqn = 'ansible.modules.%s' % module_name if module_substyle == 'python': + date_time = time.gmtime()[:6] + if date_time[0] < 1980: + date_string = datetime.datetime(*date_time, tzinfo=datetime.timezone.utc).strftime('%c') + raise AnsibleError(f'Cannot create zipfile due to pre-1980 configured date: {date_string}') params = dict(ANSIBLE_MODULE_ARGS=module_args,) try: python_repred_params = repr(json.dumps(params, cls=AnsibleJSONEncoder, vault_to_text=True)) @@ -1155,10 +1181,10 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas zf = zipfile.ZipFile(zipoutput, mode='w', compression=compression_method) # walk the module imports, looking for module_utils to send- they'll be added to the zipfile - recursive_finder(module_name, remote_module_fqn, b_module_data, zf) + recursive_finder(module_name, remote_module_fqn, b_module_data, zf, date_time) display.debug('ANSIBALLZ: Writing module into payload') - _add_module_to_zip(zf, remote_module_fqn, b_module_data) + _add_module_to_zip(zf, date_time, remote_module_fqn, b_module_data) zf.close() zipdata = base64.b64encode(zipoutput.getvalue()) @@ -1241,7 +1267,6 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas else: coverage = '' - now = datetime.datetime.utcnow() output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict( zipdata=zipdata, ansible_module=module_name, @@ -1249,12 +1274,7 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas params=python_repred_params, shebang=shebang, coding=ENCODING_STRING, - year=now.year, - month=now.month, - day=now.day, - hour=now.hour, - minute=now.minute, - second=now.second, + date_time=date_time, coverage=coverage, rlimit=rlimit, ))) @@ -1377,20 +1397,7 @@ def modify_module(module_name, module_path, module_args, templar, task_vars=None return (b_module_data, module_style, shebang) -def get_action_args_with_defaults(action, args, defaults, templar, redirected_names=None, action_groups=None): - if redirected_names: - resolved_action_name = redirected_names[-1] - else: - resolved_action_name = action - - if redirected_names is not None: - msg = ( - "Finding module_defaults for the action %s. " - "The caller passed a list of redirected action names, which is deprecated. " - "The task's resolved action should be provided as the first argument instead." - ) - display.deprecated(msg % resolved_action_name, version='2.16') - +def get_action_args_with_defaults(action, args, defaults, templar, action_groups=None): # Get the list of groups that contain this action if action_groups is None: msg = ( @@ -1401,7 +1408,7 @@ def get_action_args_with_defaults(action, args, defaults, templar, redirected_na display.warning(msg=msg) group_names = [] else: - group_names = action_groups.get(resolved_action_name, []) + group_names = action_groups.get(action, []) tmp_args = {} module_defaults = {} @@ -1420,7 +1427,7 @@ def get_action_args_with_defaults(action, args, defaults, templar, redirected_na tmp_args.update((module_defaults.get('group/%s' % group_name) or {}).copy()) # handle specific action defaults - tmp_args.update(module_defaults.get(resolved_action_name, {}).copy()) + tmp_args.update(module_defaults.get(action, {}).copy()) # direct args override all tmp_args.update(args) diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py index 2449782..cb82b9f 100644 --- a/lib/ansible/executor/play_iterator.py +++ b/lib/ansible/executor/play_iterator.py @@ -52,7 +52,7 @@ class FailedStates(IntFlag): TASKS = 2 RESCUE = 4 ALWAYS = 8 - HANDLERS = 16 + HANDLERS = 16 # NOTE not in use anymore class HostState: @@ -60,6 +60,8 @@ class HostState: self._blocks = blocks[:] self.handlers = [] + self.handler_notifications = [] + self.cur_block = 0 self.cur_regular_task = 0 self.cur_rescue_task = 0 @@ -120,6 +122,7 @@ class HostState: def copy(self): new_state = HostState(self._blocks) new_state.handlers = self.handlers[:] + new_state.handler_notifications = self.handler_notifications[:] new_state.cur_block = self.cur_block new_state.cur_regular_task = self.cur_regular_task new_state.cur_rescue_task = self.cur_rescue_task @@ -238,13 +241,6 @@ class PlayIterator: return self._host_states[host.name].copy() - def cache_block_tasks(self, block): - display.deprecated( - 'PlayIterator.cache_block_tasks is now noop due to the changes ' - 'in the way tasks are cached and is deprecated.', - version=2.16 - ) - def get_next_task_for_host(self, host, peek=False): display.debug("getting the next task for host %s" % host.name) @@ -435,22 +431,18 @@ class PlayIterator: state.update_handlers = False state.cur_handlers_task = 0 - if state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS: - state.update_handlers = True - state.run_state = IteratingStates.COMPLETE - else: - while True: - try: - task = state.handlers[state.cur_handlers_task] - except IndexError: - task = None - state.run_state = state.pre_flushing_run_state - state.update_handlers = True + while True: + try: + task = state.handlers[state.cur_handlers_task] + except IndexError: + task = None + state.run_state = state.pre_flushing_run_state + state.update_handlers = True + break + else: + state.cur_handlers_task += 1 + if task.is_host_notified(host): break - else: - state.cur_handlers_task += 1 - if task.is_host_notified(host): - break elif state.run_state == IteratingStates.COMPLETE: return (state, None) @@ -491,20 +483,16 @@ class PlayIterator: else: state.fail_state |= FailedStates.ALWAYS state.run_state = IteratingStates.COMPLETE - elif state.run_state == IteratingStates.HANDLERS: - state.fail_state |= FailedStates.HANDLERS - state.update_handlers = True - if state._blocks[state.cur_block].rescue: - state.run_state = IteratingStates.RESCUE - elif state._blocks[state.cur_block].always: - state.run_state = IteratingStates.ALWAYS - else: - state.run_state = IteratingStates.COMPLETE return state def mark_host_failed(self, host): s = self.get_host_state(host) display.debug("marking host %s failed, current state: %s" % (host, s)) + if s.run_state == IteratingStates.HANDLERS: + # we are failing `meta: flush_handlers`, so just reset the state to whatever + # it was before and let `_set_failed_state` figure out the next state + s.run_state = s.pre_flushing_run_state + s.update_handlers = True s = self._set_failed_state(s) display.debug("^ failed state is now: %s" % s) self.set_state_for_host(host.name, s) @@ -520,8 +508,6 @@ class PlayIterator: return True elif state.run_state == IteratingStates.ALWAYS and self._check_failed_state(state.always_child_state): return True - elif state.run_state == IteratingStates.HANDLERS and state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS: - return True elif state.fail_state != FailedStates.NONE: if state.run_state == IteratingStates.RESCUE and state.fail_state & FailedStates.RESCUE == 0: return False @@ -581,14 +567,6 @@ class PlayIterator: return self.is_any_block_rescuing(state.always_child_state) return False - def get_original_task(self, host, task): - display.deprecated( - 'PlayIterator.get_original_task is now noop due to the changes ' - 'in the way tasks are cached and is deprecated.', - version=2.16 - ) - return (None, None) - def _insert_tasks_into_state(self, state, task_list): # if we've failed at all, or if the task list is empty, just return the current state if (state.fail_state != FailedStates.NONE and state.run_state == IteratingStates.TASKS) or not task_list: @@ -650,3 +628,12 @@ class PlayIterator: if not isinstance(fail_state, FailedStates): raise AnsibleAssertionError('Expected fail_state to be a FailedStates but was %s' % (type(fail_state))) self._host_states[hostname].fail_state = fail_state + + def add_notification(self, hostname: str, notification: str) -> None: + # preserve order + host_state = self._host_states[hostname] + if notification not in host_state.handler_notifications: + host_state.handler_notifications.append(notification) + + def clear_notification(self, hostname: str, notification: str) -> None: + self._host_states[hostname].handler_notifications.remove(notification) diff --git a/lib/ansible/executor/playbook_executor.py b/lib/ansible/executor/playbook_executor.py index e8b2a3d..52ad0c0 100644 --- a/lib/ansible/executor/playbook_executor.py +++ b/lib/ansible/executor/playbook_executor.py @@ -24,7 +24,7 @@ import os from ansible import constants as C from ansible import context from ansible.executor.task_queue_manager import TaskQueueManager, AnsibleEndPlay -from ansible.module_utils._text import to_text +from ansible.module_utils.common.text.converters import to_text from ansible.module_utils.parsing.convert_bool import boolean from ansible.plugins.loader import become_loader, connection_loader, shell_loader from ansible.playbook import Playbook @@ -99,11 +99,11 @@ class PlaybookExecutor: playbook_collection = resource[2] else: playbook_path = playbook - # not fqcn, but might still be colleciotn playbook + # not fqcn, but might still be collection playbook playbook_collection = _get_collection_name_from_path(playbook) if playbook_collection: - display.warning("running playbook inside collection {0}".format(playbook_collection)) + display.v("running playbook inside collection {0}".format(playbook_collection)) AnsibleCollectionConfig.default_collection = playbook_collection else: AnsibleCollectionConfig.default_collection = None @@ -148,7 +148,7 @@ class PlaybookExecutor: encrypt = var.get("encrypt", None) salt_size = var.get("salt_size", None) salt = var.get("salt", None) - unsafe = var.get("unsafe", None) + unsafe = boolean(var.get("unsafe", False)) if vname not in self._variable_manager.extra_vars: if self._tqm: @@ -238,7 +238,7 @@ class PlaybookExecutor: else: basedir = '~/' - (retry_name, _) = os.path.splitext(os.path.basename(playbook_path)) + (retry_name, ext) = os.path.splitext(os.path.basename(playbook_path)) filename = os.path.join(basedir, "%s.retry" % retry_name) if self._generate_retry_inventory(filename, retries): display.display("\tto retry, use: --limit @%s\n" % filename) diff --git a/lib/ansible/executor/powershell/async_wrapper.ps1 b/lib/ansible/executor/powershell/async_wrapper.ps1 index 0cd640f..dd5a9be 100644 --- a/lib/ansible/executor/powershell/async_wrapper.ps1 +++ b/lib/ansible/executor/powershell/async_wrapper.ps1 @@ -135,11 +135,11 @@ try { # populate initial results before we send the async data to avoid result race $result = @{ - started = 1; - finished = 0; - results_file = $results_path; - ansible_job_id = $local_jid; - _ansible_suppress_tmpdir_delete = $true; + started = 1 + finished = 0 + results_file = $results_path + ansible_job_id = $local_jid + _ansible_suppress_tmpdir_delete = $true ansible_async_watchdog_pid = $watchdog_pid } diff --git a/lib/ansible/executor/powershell/module_manifest.py b/lib/ansible/executor/powershell/module_manifest.py index 87e2ce0..0720d23 100644 --- a/lib/ansible/executor/powershell/module_manifest.py +++ b/lib/ansible/executor/powershell/module_manifest.py @@ -16,7 +16,7 @@ from ansible.module_utils.compat.version import LooseVersion from ansible import constants as C from ansible.errors import AnsibleError -from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.module_utils.compat.importlib import import_module from ansible.plugins.loader import ps_module_utils_loader from ansible.utils.collection_loader import resource_from_fqcr diff --git a/lib/ansible/executor/powershell/module_wrapper.ps1 b/lib/ansible/executor/powershell/module_wrapper.ps1 index 20a9677..1cfaf3c 100644 --- a/lib/ansible/executor/powershell/module_wrapper.ps1 +++ b/lib/ansible/executor/powershell/module_wrapper.ps1 @@ -207,7 +207,10 @@ if ($null -ne $rc) { # with the trap handler that's now in place, this should only write to the output if # $ErrorActionPreference != "Stop", that's ok because this is sent to the stderr output # for a user to manually debug if something went horribly wrong -if ($ps.HadErrors -or ($PSVersionTable.PSVersion.Major -lt 4 -and $ps.Streams.Error.Count -gt 0)) { +if ( + $ps.Streams.Error.Count -and + ($ps.HadErrors -or $PSVersionTable.PSVersion.Major -lt 4) +) { Write-AnsibleLog "WARN - module had errors, outputting error info $ModuleName" "module_wrapper" # if the rc wasn't explicitly set, we return an exit code of 1 if ($null -eq $rc) { diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 5113b83..c043137 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -24,10 +24,11 @@ import sys import traceback from jinja2.exceptions import TemplateNotFound +from multiprocessing.queues import Queue -from ansible.errors import AnsibleConnectionFailure +from ansible.errors import AnsibleConnectionFailure, AnsibleError from ansible.executor.task_executor import TaskExecutor -from ansible.module_utils._text import to_text +from ansible.module_utils.common.text.converters import to_text from ansible.utils.display import Display from ansible.utils.multiprocessing import context as multiprocessing_context @@ -35,6 +36,17 @@ __all__ = ['WorkerProcess'] display = Display() +current_worker = None + + +class WorkerQueue(Queue): + """Queue that raises AnsibleError items on get().""" + def get(self, *args, **kwargs): + result = super(WorkerQueue, self).get(*args, **kwargs) + if isinstance(result, AnsibleError): + raise result + return result + class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defined] ''' @@ -43,7 +55,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin for reading later. ''' - def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj, worker_id): super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: @@ -60,6 +72,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # clear var to ensure we only delete files for this child self._loader._tempfiles = set() + self.worker_queue = WorkerQueue(ctx=multiprocessing_context) + self.worker_id = worker_id + def _save_stdin(self): self._new_stdin = None try: @@ -155,6 +170,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # Set the queue on Display so calls to Display.display are proxied over the queue display.set_queue(self._final_q) + global current_worker + current_worker = self + try: # execute the task and build a TaskResult from the result display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task)) @@ -166,7 +184,8 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin self._new_stdin, self._loader, self._shared_loader_obj, - self._final_q + self._final_q, + self._variable_manager, ).run() display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid)) @@ -175,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # put the result on the result queue display.debug("sending task result for task %s" % self._task._uuid) - self._final_q.send_task_result( - self._host.name, - self._task._uuid, - executor_result, - task_fields=self._task.dump_attrs(), - ) + try: + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + executor_result, + task_fields=self._task.dump_attrs(), + ) + except Exception as e: + display.debug(f'failed to send task result ({e}), sending surrogate result') + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + # Overriding the task result, to represent the failure + { + 'failed': True, + 'msg': f'{e}', + 'exception': traceback.format_exc(), + }, + # The failure pickling may have been caused by the task attrs, omit for safety + {}, + ) display.debug("done sending task result for task %s" % self._task._uuid) except AnsibleConnectionFailure: diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index 02ace8f..0e7394f 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -20,14 +20,14 @@ from ansible.executor.task_result import TaskResult from ansible.executor.module_common import get_action_args_with_defaults from ansible.module_utils.parsing.convert_bool import boolean from ansible.module_utils.six import binary_type -from ansible.module_utils._text import to_text, to_native +from ansible.module_utils.common.text.converters import to_text, to_native from ansible.module_utils.connection import write_to_file_descriptor from ansible.playbook.conditional import Conditional from ansible.playbook.task import Task from ansible.plugins import get_plugin_class from ansible.plugins.loader import become_loader, cliconf_loader, connection_loader, httpapi_loader, netconf_loader, terminal_loader from ansible.template import Templar -from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef +from ansible.utils.collection_loader import AnsibleCollectionConfig from ansible.utils.listify import listify_lookup_plugin_terms from ansible.utils.unsafe_proxy import to_unsafe_text, wrap_var from ansible.vars.clean import namespace_facts, clean_facts @@ -82,7 +82,7 @@ class TaskExecutor: class. ''' - def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q): + def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q, variable_manager): self._host = host self._task = task self._job_vars = job_vars @@ -92,6 +92,7 @@ class TaskExecutor: self._shared_loader_obj = shared_loader_obj self._connection = None self._final_q = final_q + self._variable_manager = variable_manager self._loop_eval_error = None self._task.squash() @@ -136,6 +137,12 @@ class TaskExecutor: self._task.ignore_errors = item_ignore elif self._task.ignore_errors and not item_ignore: self._task.ignore_errors = item_ignore + if 'unreachable' in item and item['unreachable']: + item_ignore_unreachable = item.pop('_ansible_ignore_unreachable') + if not res.get('unreachable'): + self._task.ignore_unreachable = item_ignore_unreachable + elif self._task.ignore_unreachable and not item_ignore_unreachable: + self._task.ignore_unreachable = item_ignore_unreachable # ensure to accumulate these for array in ['warnings', 'deprecations']: @@ -215,21 +222,13 @@ class TaskExecutor: templar = Templar(loader=self._loader, variables=self._job_vars) items = None - loop_cache = self._job_vars.get('_ansible_loop_cache') - if loop_cache is not None: - # _ansible_loop_cache may be set in `get_vars` when calculating `delegate_to` - # to avoid reprocessing the loop - items = loop_cache - elif self._task.loop_with: + if self._task.loop_with: if self._task.loop_with in self._shared_loader_obj.lookup_loader: - fail = True - if self._task.loop_with == 'first_found': - # first_found loops are special. If the item is undefined then we want to fall through to the next value rather than failing. - fail = False + # TODO: hardcoded so it fails for non first_found lookups, but thhis shoudl be generalized for those that don't do their own templating + # lookup prop/attribute? + fail = bool(self._task.loop_with != 'first_found') loop_terms = listify_lookup_plugin_terms(terms=self._task.loop, templar=templar, fail_on_undefined=fail, convert_bare=False) - if not fail: - loop_terms = [t for t in loop_terms if not templar.is_template(t)] # get lookup mylookup = self._shared_loader_obj.lookup_loader.get(self._task.loop_with, loader=self._loader, templar=templar) @@ -281,6 +280,7 @@ class TaskExecutor: u" to something else to avoid variable collisions and unexpected behavior." % (self._task, loop_var)) ran_once = False + task_fields = None no_log = False items_len = len(items) results = [] @@ -352,6 +352,7 @@ class TaskExecutor: res['_ansible_item_result'] = True res['_ansible_ignore_errors'] = task_fields.get('ignore_errors') + res['_ansible_ignore_unreachable'] = task_fields.get('ignore_unreachable') # gets templated here unlike rest of loop_control fields, depends on loop_var above try: @@ -396,9 +397,25 @@ class TaskExecutor: del task_vars[var] self._task.no_log = no_log + # NOTE: run_once cannot contain loop vars because it's templated earlier also + # This is saving the post-validated field from the last loop so the strategy can use the templated value post task execution + self._task.run_once = task_fields.get('run_once') + self._task.action = task_fields.get('action') return results + def _calculate_delegate_to(self, templar, variables): + """This method is responsible for effectively pre-validating Task.delegate_to and will + happen before Task.post_validate is executed + """ + delegated_vars, delegated_host_name = self._variable_manager.get_delegated_vars_and_hostname(templar, self._task, variables) + # At the point this is executed it is safe to mutate self._task, + # since `self._task` is either a copy referred to by `tmp_task` in `_run_loop` + # or just a singular non-looped task + if delegated_host_name: + self._task.delegate_to = delegated_host_name + variables.update(delegated_vars) + def _execute(self, variables=None): ''' The primary workhorse of the executor system, this runs the task @@ -411,6 +428,8 @@ class TaskExecutor: templar = Templar(loader=self._loader, variables=variables) + self._calculate_delegate_to(templar, variables) + context_validation_error = None # a certain subset of variables exist. @@ -450,9 +469,11 @@ class TaskExecutor: # the fact that the conditional may specify that the task be skipped due to a # variable not being present which would otherwise cause validation to fail try: - if not self._task.evaluate_conditional(templar, tempvars): + conditional_result, false_condition = self._task.evaluate_conditional_with_result(templar, tempvars) + if not conditional_result: display.debug("when evaluation is False, skipping this task") - return dict(changed=False, skipped=True, skip_reason='Conditional result was False', _ansible_no_log=no_log) + return dict(changed=False, skipped=True, skip_reason='Conditional result was False', + false_condition=false_condition, _ansible_no_log=no_log) except AnsibleError as e: # loop error takes precedence if self._loop_eval_error is not None: @@ -486,7 +507,7 @@ class TaskExecutor: # if this task is a TaskInclude, we just return now with a success code so the # main thread can expand the task list for the given host - if self._task.action in C._ACTION_ALL_INCLUDE_TASKS: + if self._task.action in C._ACTION_INCLUDE_TASKS: include_args = self._task.args.copy() include_file = include_args.pop('_raw_params', None) if not include_file: @@ -570,25 +591,14 @@ class TaskExecutor: # feed back into pc to ensure plugins not using get_option can get correct value self._connection._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=vars_copy, templar=templar) - # for persistent connections, initialize socket path and start connection manager - if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)): - self._play_context.timeout = self._connection.get_option('persistent_command_timeout') - display.vvvv('attempting to start connection', host=self._play_context.remote_addr) - display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr) - - options = self._connection.get_options() - socket_path = start_connection(self._play_context, options, self._task._uuid) - display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr) - setattr(self._connection, '_socket_path', socket_path) - - # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules + # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules, right now rely on remote_is_local connection # special handling for python interpreter for network_os, default to ansible python unless overridden - if 'ansible_network_os' in cvars and 'ansible_python_interpreter' not in cvars: + if 'ansible_python_interpreter' not in cvars and 'ansible_network_os' in cvars and getattr(self._connection, '_remote_is_local', False): # this also avoids 'python discovery' cvars['ansible_python_interpreter'] = sys.executable # get handler - self._handler, module_context = self._get_action_handler_with_module_context(connection=self._connection, templar=templar) + self._handler, module_context = self._get_action_handler_with_module_context(templar=templar) if module_context is not None: module_defaults_fqcn = module_context.resolved_fqcn @@ -606,17 +616,11 @@ class TaskExecutor: if omit_token is not None: self._task.args = remove_omit(self._task.args, omit_token) - # Read some values from the task, so that we can modify them if need be - if self._task.until: - retries = self._task.retries - if retries is None: - retries = 3 - elif retries <= 0: - retries = 1 - else: - retries += 1 - else: - retries = 1 + retries = 1 # includes the default actual run + retries set by user/default + if self._task.retries is not None: + retries += max(0, self._task.retries) + elif self._task.until: + retries += 3 # the default is not set in FA because we need to differentiate "unset" value delay = self._task.delay if delay < 0: @@ -722,7 +726,7 @@ class TaskExecutor: result['failed'] = False # Make attempts and retries available early to allow their use in changed/failed_when - if self._task.until: + if retries > 1: result['attempts'] = attempt # set the changed property if it was missing. @@ -754,7 +758,7 @@ class TaskExecutor: if retries > 1: cond = Conditional(loader=self._loader) - cond.when = self._task.until + cond.when = self._task.until or [not result['failed']] if cond.evaluate_conditional(templar, vars_copy): break else: @@ -773,7 +777,7 @@ class TaskExecutor: ) ) time.sleep(delay) - self._handler = self._get_action_handler(connection=self._connection, templar=templar) + self._handler = self._get_action_handler(templar=templar) else: if retries > 1: # we ran out of attempts, so mark the result as failed @@ -1091,13 +1095,13 @@ class TaskExecutor: return varnames - def _get_action_handler(self, connection, templar): + def _get_action_handler(self, templar): ''' Returns the correct action plugin to handle the requestion task action ''' - return self._get_action_handler_with_module_context(connection, templar)[0] + return self._get_action_handler_with_module_context(templar)[0] - def _get_action_handler_with_module_context(self, connection, templar): + def _get_action_handler_with_module_context(self, templar): ''' Returns the correct action plugin to handle the requestion task action and the module context ''' @@ -1134,10 +1138,29 @@ class TaskExecutor: handler_name = 'ansible.legacy.normal' collections = None # until then, we don't want the task's collection list to be consulted; use the builtin + # networking/psersistent connections handling + if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)): + + # check handler in case we dont need to do all the work to setup persistent connection + handler_class = self._shared_loader_obj.action_loader.get(handler_name, class_only=True) + if getattr(handler_class, '_requires_connection', True): + # for persistent connections, initialize socket path and start connection manager + self._play_context.timeout = self._connection.get_option('persistent_command_timeout') + display.vvvv('attempting to start connection', host=self._play_context.remote_addr) + display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr) + + options = self._connection.get_options() + socket_path = start_connection(self._play_context, options, self._task._uuid) + display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr) + setattr(self._connection, '_socket_path', socket_path) + else: + # TODO: set self._connection to dummy/noop connection, using local for now + self._connection = self._get_connection({}, templar, 'local') + handler = self._shared_loader_obj.action_loader.get( handler_name, task=self._task, - connection=connection, + connection=self._connection, play_context=self._play_context, loader=self._loader, templar=templar, @@ -1213,8 +1236,7 @@ def start_connection(play_context, options, task_uuid): else: try: result = json.loads(to_text(stderr, errors='surrogate_then_replace')) - except getattr(json.decoder, 'JSONDecodeError', ValueError): - # JSONDecodeError only available on Python 3.5+ + except json.decoder.JSONDecodeError: result = {'error': to_text(stderr, errors='surrogate_then_replace')} if 'messages' in result: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index dcfc38a..3bbf3d5 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -24,6 +24,7 @@ import sys import tempfile import threading import time +import typing as t import multiprocessing.queues from ansible import constants as C @@ -33,7 +34,7 @@ from ansible.executor.play_iterator import PlayIterator from ansible.executor.stats import AggregateStats from ansible.executor.task_result import TaskResult from ansible.module_utils.six import string_types -from ansible.module_utils._text import to_text, to_native +from ansible.module_utils.common.text.converters import to_text, to_native from ansible.playbook.play_context import PlayContext from ansible.playbook.task import Task from ansible.plugins.loader import callback_loader, strategy_loader, module_loader @@ -45,6 +46,7 @@ from ansible.utils.display import Display from ansible.utils.lock import lock_decorator from ansible.utils.multiprocessing import context as multiprocessing_context +from dataclasses import dataclass __all__ = ['TaskQueueManager'] @@ -59,20 +61,30 @@ class CallbackSend: class DisplaySend: - def __init__(self, *args, **kwargs): + def __init__(self, method, *args, **kwargs): + self.method = method self.args = args self.kwargs = kwargs -class FinalQueue(multiprocessing.queues.Queue): +@dataclass +class PromptSend: + worker_id: int + prompt: str + private: bool = True + seconds: int = None + interrupt_input: t.Iterable[bytes] = None + complete_input: t.Iterable[bytes] = None + + +class FinalQueue(multiprocessing.queues.SimpleQueue): def __init__(self, *args, **kwargs): kwargs['ctx'] = multiprocessing_context - super(FinalQueue, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def send_callback(self, method_name, *args, **kwargs): self.put( CallbackSend(method_name, *args, **kwargs), - block=False ) def send_task_result(self, *args, **kwargs): @@ -82,13 +94,16 @@ class FinalQueue(multiprocessing.queues.Queue): tr = TaskResult(*args, **kwargs) self.put( tr, - block=False ) - def send_display(self, *args, **kwargs): + def send_display(self, method, *args, **kwargs): + self.put( + DisplaySend(method, *args, **kwargs), + ) + + def send_prompt(self, **kwargs): self.put( - DisplaySend(*args, **kwargs), - block=False + PromptSend(**kwargs), ) @@ -217,7 +232,7 @@ class TaskQueueManager: callback_name = cnames[0] else: # fallback to 'old loader name' - (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) + (callback_name, ext) = os.path.splitext(os.path.basename(callback_plugin._original_path)) display.vvvvv("Attempting to use '%s' callback." % (callback_name)) if callback_type == 'stdout': |