summaryrefslogtreecommitdiffstats
path: root/lib/ansible/executor
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:55:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:55:42 +0000
commit62d9962ec7d01c95bf5732169320d3857a41446e (patch)
treef60d8fc63ff738e5f5afec48a84cf41480ee1315 /lib/ansible/executor
parentReleasing progress-linux version 2.14.13-1~progress7.99u1. (diff)
downloadansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.tar.xz
ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.zip
Merging upstream version 2.16.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/executor')
-rw-r--r--lib/ansible/executor/action_write_locks.py6
-rw-r--r--lib/ansible/executor/interpreter_discovery.py2
-rw-r--r--lib/ansible/executor/module_common.py85
-rw-r--r--lib/ansible/executor/play_iterator.py71
-rw-r--r--lib/ansible/executor/playbook_executor.py10
-rw-r--r--lib/ansible/executor/powershell/async_wrapper.ps110
-rw-r--r--lib/ansible/executor/powershell/module_manifest.py2
-rw-r--r--lib/ansible/executor/powershell/module_wrapper.ps15
-rw-r--r--lib/ansible/executor/process/worker.py54
-rw-r--r--lib/ansible/executor/task_executor.py126
-rw-r--r--lib/ansible/executor/task_queue_manager.py35
11 files changed, 236 insertions, 170 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py
index fd82744..d2acae9 100644
--- a/lib/ansible/executor/action_write_locks.py
+++ b/lib/ansible/executor/action_write_locks.py
@@ -15,9 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
+from __future__ import annotations
import multiprocessing.synchronize
@@ -29,7 +27,7 @@ if 'action_write_locks' not in globals():
# Do not initialize this more than once because it seems to bash
# the existing one. multiprocessing must be reloading the module
# when it forks?
- action_write_locks = dict() # type: dict[str | None, multiprocessing.synchronize.Lock]
+ action_write_locks: dict[str | None, multiprocessing.synchronize.Lock] = dict()
# Below is a Lock for use when we weren't expecting a named module. It gets used when an action
# plugin invokes a module whose name does not match with the action's name. Slightly less
diff --git a/lib/ansible/executor/interpreter_discovery.py b/lib/ansible/executor/interpreter_discovery.py
index bfd8504..c95cf2e 100644
--- a/lib/ansible/executor/interpreter_discovery.py
+++ b/lib/ansible/executor/interpreter_discovery.py
@@ -10,7 +10,7 @@ import pkgutil
import re
from ansible import constants as C
-from ansible.module_utils._text import to_native, to_text
+from ansible.module_utils.common.text.converters import to_native, to_text
from ansible.module_utils.distro import LinuxDistribution
from ansible.utils.display import Display
from ansible.utils.plugin_docs import get_versioned_doclink
diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py
index 4d06acb..3517543 100644
--- a/lib/ansible/executor/module_common.py
+++ b/lib/ansible/executor/module_common.py
@@ -26,6 +26,7 @@ import datetime
import json
import os
import shlex
+import time
import zipfile
import re
import pkgutil
@@ -166,7 +167,7 @@ def _ansiballz_main():
else:
PY3 = True
- ZIPDATA = """%(zipdata)s"""
+ ZIPDATA = %(zipdata)r
# Note: temp_path isn't needed once we switch to zipimport
def invoke_module(modlib_path, temp_path, json_params):
@@ -177,13 +178,13 @@ def _ansiballz_main():
z = zipfile.ZipFile(modlib_path, mode='a')
# py3: modlib_path will be text, py2: it's bytes. Need bytes at the end
- sitecustomize = u'import sys\\nsys.path.insert(0,"%%s")\\n' %% modlib_path
+ sitecustomize = u'import sys\\nsys.path.insert(0,"%%s")\\n' %% modlib_path
sitecustomize = sitecustomize.encode('utf-8')
# Use a ZipInfo to work around zipfile limitation on hosts with
# clocks set to a pre-1980 year (for instance, Raspberry Pi)
zinfo = zipfile.ZipInfo()
zinfo.filename = 'sitecustomize.py'
- zinfo.date_time = ( %(year)i, %(month)i, %(day)i, %(hour)i, %(minute)i, %(second)i)
+ zinfo.date_time = %(date_time)s
z.writestr(zinfo, sitecustomize)
z.close()
@@ -196,7 +197,7 @@ def _ansiballz_main():
basic._ANSIBLE_ARGS = json_params
%(coverage)s
# Run the module! By importing it as '__main__', it thinks it is executing as a script
- runpy.run_module(mod_name='%(module_fqn)s', init_globals=dict(_module_fqn='%(module_fqn)s', _modlib_path=modlib_path),
+ runpy.run_module(mod_name=%(module_fqn)r, init_globals=dict(_module_fqn=%(module_fqn)r, _modlib_path=modlib_path),
run_name='__main__', alter_sys=True)
# Ansible modules must exit themselves
@@ -287,7 +288,7 @@ def _ansiballz_main():
basic._ANSIBLE_ARGS = json_params
# Run the module! By importing it as '__main__', it thinks it is executing as a script
- runpy.run_module(mod_name='%(module_fqn)s', init_globals=None, run_name='__main__', alter_sys=True)
+ runpy.run_module(mod_name=%(module_fqn)r, init_globals=None, run_name='__main__', alter_sys=True)
# Ansible modules must exit themselves
print('{"msg": "New-style module did not handle its own exit", "failed": true}')
@@ -312,9 +313,9 @@ def _ansiballz_main():
# store this in remote_tmpdir (use system tempdir instead)
# Only need to use [ansible_module]_payload_ in the temp_path until we move to zipimport
# (this helps ansible-test produce coverage stats)
- temp_path = tempfile.mkdtemp(prefix='ansible_%(ansible_module)s_payload_')
+ temp_path = tempfile.mkdtemp(prefix='ansible_' + %(ansible_module)r + '_payload_')
- zipped_mod = os.path.join(temp_path, 'ansible_%(ansible_module)s_payload.zip')
+ zipped_mod = os.path.join(temp_path, 'ansible_' + %(ansible_module)r + '_payload.zip')
with open(zipped_mod, 'wb') as modlib:
modlib.write(base64.b64decode(ZIPDATA))
@@ -337,7 +338,7 @@ if __name__ == '__main__':
'''
ANSIBALLZ_COVERAGE_TEMPLATE = '''
- os.environ['COVERAGE_FILE'] = '%(coverage_output)s=python-%%s=coverage' %% '.'.join(str(v) for v in sys.version_info[:2])
+ os.environ['COVERAGE_FILE'] = %(coverage_output)r + '=python-%%s=coverage' %% '.'.join(str(v) for v in sys.version_info[:2])
import atexit
@@ -347,7 +348,7 @@ ANSIBALLZ_COVERAGE_TEMPLATE = '''
print('{"msg": "Could not import `coverage` module.", "failed": true}')
sys.exit(1)
- cov = coverage.Coverage(config_file='%(coverage_config)s')
+ cov = coverage.Coverage(config_file=%(coverage_config)r)
def atexit_coverage():
cov.stop()
@@ -870,7 +871,17 @@ class CollectionModuleUtilLocator(ModuleUtilLocatorBase):
return name_parts[5:] # eg, foo.bar for ansible_collections.ns.coll.plugins.module_utils.foo.bar
-def recursive_finder(name, module_fqn, module_data, zf):
+def _make_zinfo(filename, date_time, zf=None):
+ zinfo = zipfile.ZipInfo(
+ filename=filename,
+ date_time=date_time
+ )
+ if zf:
+ zinfo.compress_type = zf.compression
+ return zinfo
+
+
+def recursive_finder(name, module_fqn, module_data, zf, date_time=None):
"""
Using ModuleDepFinder, make sure we have all of the module_utils files that
the module and its module_utils files needs. (no longer actually recursive)
@@ -880,6 +891,8 @@ def recursive_finder(name, module_fqn, module_data, zf):
:arg zf: An open :python:class:`zipfile.ZipFile` object that holds the Ansible module payload
which we're assembling
"""
+ if date_time is None:
+ date_time = time.gmtime()[:6]
# py_module_cache maps python module names to a tuple of the code in the module
# and the pathname to the module.
@@ -976,7 +989,10 @@ def recursive_finder(name, module_fqn, module_data, zf):
for py_module_name in py_module_cache:
py_module_file_name = py_module_cache[py_module_name][1]
- zf.writestr(py_module_file_name, py_module_cache[py_module_name][0])
+ zf.writestr(
+ _make_zinfo(py_module_file_name, date_time, zf=zf),
+ py_module_cache[py_module_name][0]
+ )
mu_file = to_text(py_module_file_name, errors='surrogate_or_strict')
display.vvvvv("Including module_utils file %s" % mu_file)
@@ -1020,13 +1036,16 @@ def _get_ansible_module_fqn(module_path):
return remote_module_fqn
-def _add_module_to_zip(zf, remote_module_fqn, b_module_data):
+def _add_module_to_zip(zf, date_time, remote_module_fqn, b_module_data):
"""Add a module from ansible or from an ansible collection into the module zip"""
module_path_parts = remote_module_fqn.split('.')
# Write the module
module_path = '/'.join(module_path_parts) + '.py'
- zf.writestr(module_path, b_module_data)
+ zf.writestr(
+ _make_zinfo(module_path, date_time, zf=zf),
+ b_module_data
+ )
# Write the __init__.py's necessary to get there
if module_path_parts[0] == 'ansible':
@@ -1045,7 +1064,10 @@ def _add_module_to_zip(zf, remote_module_fqn, b_module_data):
continue
# Note: We don't want to include more than one ansible module in a payload at this time
# so no need to fill the __init__.py with namespace code
- zf.writestr(package_path, b'')
+ zf.writestr(
+ _make_zinfo(package_path, date_time, zf=zf),
+ b''
+ )
def _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression, async_timeout, become,
@@ -1110,6 +1132,10 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
remote_module_fqn = 'ansible.modules.%s' % module_name
if module_substyle == 'python':
+ date_time = time.gmtime()[:6]
+ if date_time[0] < 1980:
+ date_string = datetime.datetime(*date_time, tzinfo=datetime.timezone.utc).strftime('%c')
+ raise AnsibleError(f'Cannot create zipfile due to pre-1980 configured date: {date_string}')
params = dict(ANSIBLE_MODULE_ARGS=module_args,)
try:
python_repred_params = repr(json.dumps(params, cls=AnsibleJSONEncoder, vault_to_text=True))
@@ -1155,10 +1181,10 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
zf = zipfile.ZipFile(zipoutput, mode='w', compression=compression_method)
# walk the module imports, looking for module_utils to send- they'll be added to the zipfile
- recursive_finder(module_name, remote_module_fqn, b_module_data, zf)
+ recursive_finder(module_name, remote_module_fqn, b_module_data, zf, date_time)
display.debug('ANSIBALLZ: Writing module into payload')
- _add_module_to_zip(zf, remote_module_fqn, b_module_data)
+ _add_module_to_zip(zf, date_time, remote_module_fqn, b_module_data)
zf.close()
zipdata = base64.b64encode(zipoutput.getvalue())
@@ -1241,7 +1267,6 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
else:
coverage = ''
- now = datetime.datetime.utcnow()
output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict(
zipdata=zipdata,
ansible_module=module_name,
@@ -1249,12 +1274,7 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
params=python_repred_params,
shebang=shebang,
coding=ENCODING_STRING,
- year=now.year,
- month=now.month,
- day=now.day,
- hour=now.hour,
- minute=now.minute,
- second=now.second,
+ date_time=date_time,
coverage=coverage,
rlimit=rlimit,
)))
@@ -1377,20 +1397,7 @@ def modify_module(module_name, module_path, module_args, templar, task_vars=None
return (b_module_data, module_style, shebang)
-def get_action_args_with_defaults(action, args, defaults, templar, redirected_names=None, action_groups=None):
- if redirected_names:
- resolved_action_name = redirected_names[-1]
- else:
- resolved_action_name = action
-
- if redirected_names is not None:
- msg = (
- "Finding module_defaults for the action %s. "
- "The caller passed a list of redirected action names, which is deprecated. "
- "The task's resolved action should be provided as the first argument instead."
- )
- display.deprecated(msg % resolved_action_name, version='2.16')
-
+def get_action_args_with_defaults(action, args, defaults, templar, action_groups=None):
# Get the list of groups that contain this action
if action_groups is None:
msg = (
@@ -1401,7 +1408,7 @@ def get_action_args_with_defaults(action, args, defaults, templar, redirected_na
display.warning(msg=msg)
group_names = []
else:
- group_names = action_groups.get(resolved_action_name, [])
+ group_names = action_groups.get(action, [])
tmp_args = {}
module_defaults = {}
@@ -1420,7 +1427,7 @@ def get_action_args_with_defaults(action, args, defaults, templar, redirected_na
tmp_args.update((module_defaults.get('group/%s' % group_name) or {}).copy())
# handle specific action defaults
- tmp_args.update(module_defaults.get(resolved_action_name, {}).copy())
+ tmp_args.update(module_defaults.get(action, {}).copy())
# direct args override all
tmp_args.update(args)
diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py
index 2449782..cb82b9f 100644
--- a/lib/ansible/executor/play_iterator.py
+++ b/lib/ansible/executor/play_iterator.py
@@ -52,7 +52,7 @@ class FailedStates(IntFlag):
TASKS = 2
RESCUE = 4
ALWAYS = 8
- HANDLERS = 16
+ HANDLERS = 16 # NOTE not in use anymore
class HostState:
@@ -60,6 +60,8 @@ class HostState:
self._blocks = blocks[:]
self.handlers = []
+ self.handler_notifications = []
+
self.cur_block = 0
self.cur_regular_task = 0
self.cur_rescue_task = 0
@@ -120,6 +122,7 @@ class HostState:
def copy(self):
new_state = HostState(self._blocks)
new_state.handlers = self.handlers[:]
+ new_state.handler_notifications = self.handler_notifications[:]
new_state.cur_block = self.cur_block
new_state.cur_regular_task = self.cur_regular_task
new_state.cur_rescue_task = self.cur_rescue_task
@@ -238,13 +241,6 @@ class PlayIterator:
return self._host_states[host.name].copy()
- def cache_block_tasks(self, block):
- display.deprecated(
- 'PlayIterator.cache_block_tasks is now noop due to the changes '
- 'in the way tasks are cached and is deprecated.',
- version=2.16
- )
-
def get_next_task_for_host(self, host, peek=False):
display.debug("getting the next task for host %s" % host.name)
@@ -435,22 +431,18 @@ class PlayIterator:
state.update_handlers = False
state.cur_handlers_task = 0
- if state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS:
- state.update_handlers = True
- state.run_state = IteratingStates.COMPLETE
- else:
- while True:
- try:
- task = state.handlers[state.cur_handlers_task]
- except IndexError:
- task = None
- state.run_state = state.pre_flushing_run_state
- state.update_handlers = True
+ while True:
+ try:
+ task = state.handlers[state.cur_handlers_task]
+ except IndexError:
+ task = None
+ state.run_state = state.pre_flushing_run_state
+ state.update_handlers = True
+ break
+ else:
+ state.cur_handlers_task += 1
+ if task.is_host_notified(host):
break
- else:
- state.cur_handlers_task += 1
- if task.is_host_notified(host):
- break
elif state.run_state == IteratingStates.COMPLETE:
return (state, None)
@@ -491,20 +483,16 @@ class PlayIterator:
else:
state.fail_state |= FailedStates.ALWAYS
state.run_state = IteratingStates.COMPLETE
- elif state.run_state == IteratingStates.HANDLERS:
- state.fail_state |= FailedStates.HANDLERS
- state.update_handlers = True
- if state._blocks[state.cur_block].rescue:
- state.run_state = IteratingStates.RESCUE
- elif state._blocks[state.cur_block].always:
- state.run_state = IteratingStates.ALWAYS
- else:
- state.run_state = IteratingStates.COMPLETE
return state
def mark_host_failed(self, host):
s = self.get_host_state(host)
display.debug("marking host %s failed, current state: %s" % (host, s))
+ if s.run_state == IteratingStates.HANDLERS:
+ # we are failing `meta: flush_handlers`, so just reset the state to whatever
+ # it was before and let `_set_failed_state` figure out the next state
+ s.run_state = s.pre_flushing_run_state
+ s.update_handlers = True
s = self._set_failed_state(s)
display.debug("^ failed state is now: %s" % s)
self.set_state_for_host(host.name, s)
@@ -520,8 +508,6 @@ class PlayIterator:
return True
elif state.run_state == IteratingStates.ALWAYS and self._check_failed_state(state.always_child_state):
return True
- elif state.run_state == IteratingStates.HANDLERS and state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS:
- return True
elif state.fail_state != FailedStates.NONE:
if state.run_state == IteratingStates.RESCUE and state.fail_state & FailedStates.RESCUE == 0:
return False
@@ -581,14 +567,6 @@ class PlayIterator:
return self.is_any_block_rescuing(state.always_child_state)
return False
- def get_original_task(self, host, task):
- display.deprecated(
- 'PlayIterator.get_original_task is now noop due to the changes '
- 'in the way tasks are cached and is deprecated.',
- version=2.16
- )
- return (None, None)
-
def _insert_tasks_into_state(self, state, task_list):
# if we've failed at all, or if the task list is empty, just return the current state
if (state.fail_state != FailedStates.NONE and state.run_state == IteratingStates.TASKS) or not task_list:
@@ -650,3 +628,12 @@ class PlayIterator:
if not isinstance(fail_state, FailedStates):
raise AnsibleAssertionError('Expected fail_state to be a FailedStates but was %s' % (type(fail_state)))
self._host_states[hostname].fail_state = fail_state
+
+ def add_notification(self, hostname: str, notification: str) -> None:
+ # preserve order
+ host_state = self._host_states[hostname]
+ if notification not in host_state.handler_notifications:
+ host_state.handler_notifications.append(notification)
+
+ def clear_notification(self, hostname: str, notification: str) -> None:
+ self._host_states[hostname].handler_notifications.remove(notification)
diff --git a/lib/ansible/executor/playbook_executor.py b/lib/ansible/executor/playbook_executor.py
index e8b2a3d..52ad0c0 100644
--- a/lib/ansible/executor/playbook_executor.py
+++ b/lib/ansible/executor/playbook_executor.py
@@ -24,7 +24,7 @@ import os
from ansible import constants as C
from ansible import context
from ansible.executor.task_queue_manager import TaskQueueManager, AnsibleEndPlay
-from ansible.module_utils._text import to_text
+from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.plugins.loader import become_loader, connection_loader, shell_loader
from ansible.playbook import Playbook
@@ -99,11 +99,11 @@ class PlaybookExecutor:
playbook_collection = resource[2]
else:
playbook_path = playbook
- # not fqcn, but might still be colleciotn playbook
+ # not fqcn, but might still be collection playbook
playbook_collection = _get_collection_name_from_path(playbook)
if playbook_collection:
- display.warning("running playbook inside collection {0}".format(playbook_collection))
+ display.v("running playbook inside collection {0}".format(playbook_collection))
AnsibleCollectionConfig.default_collection = playbook_collection
else:
AnsibleCollectionConfig.default_collection = None
@@ -148,7 +148,7 @@ class PlaybookExecutor:
encrypt = var.get("encrypt", None)
salt_size = var.get("salt_size", None)
salt = var.get("salt", None)
- unsafe = var.get("unsafe", None)
+ unsafe = boolean(var.get("unsafe", False))
if vname not in self._variable_manager.extra_vars:
if self._tqm:
@@ -238,7 +238,7 @@ class PlaybookExecutor:
else:
basedir = '~/'
- (retry_name, _) = os.path.splitext(os.path.basename(playbook_path))
+ (retry_name, ext) = os.path.splitext(os.path.basename(playbook_path))
filename = os.path.join(basedir, "%s.retry" % retry_name)
if self._generate_retry_inventory(filename, retries):
display.display("\tto retry, use: --limit @%s\n" % filename)
diff --git a/lib/ansible/executor/powershell/async_wrapper.ps1 b/lib/ansible/executor/powershell/async_wrapper.ps1
index 0cd640f..dd5a9be 100644
--- a/lib/ansible/executor/powershell/async_wrapper.ps1
+++ b/lib/ansible/executor/powershell/async_wrapper.ps1
@@ -135,11 +135,11 @@ try {
# populate initial results before we send the async data to avoid result race
$result = @{
- started = 1;
- finished = 0;
- results_file = $results_path;
- ansible_job_id = $local_jid;
- _ansible_suppress_tmpdir_delete = $true;
+ started = 1
+ finished = 0
+ results_file = $results_path
+ ansible_job_id = $local_jid
+ _ansible_suppress_tmpdir_delete = $true
ansible_async_watchdog_pid = $watchdog_pid
}
diff --git a/lib/ansible/executor/powershell/module_manifest.py b/lib/ansible/executor/powershell/module_manifest.py
index 87e2ce0..0720d23 100644
--- a/lib/ansible/executor/powershell/module_manifest.py
+++ b/lib/ansible/executor/powershell/module_manifest.py
@@ -16,7 +16,7 @@ from ansible.module_utils.compat.version import LooseVersion
from ansible import constants as C
from ansible.errors import AnsibleError
-from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.compat.importlib import import_module
from ansible.plugins.loader import ps_module_utils_loader
from ansible.utils.collection_loader import resource_from_fqcr
diff --git a/lib/ansible/executor/powershell/module_wrapper.ps1 b/lib/ansible/executor/powershell/module_wrapper.ps1
index 20a9677..1cfaf3c 100644
--- a/lib/ansible/executor/powershell/module_wrapper.ps1
+++ b/lib/ansible/executor/powershell/module_wrapper.ps1
@@ -207,7 +207,10 @@ if ($null -ne $rc) {
# with the trap handler that's now in place, this should only write to the output if
# $ErrorActionPreference != "Stop", that's ok because this is sent to the stderr output
# for a user to manually debug if something went horribly wrong
-if ($ps.HadErrors -or ($PSVersionTable.PSVersion.Major -lt 4 -and $ps.Streams.Error.Count -gt 0)) {
+if (
+ $ps.Streams.Error.Count -and
+ ($ps.HadErrors -or $PSVersionTable.PSVersion.Major -lt 4)
+) {
Write-AnsibleLog "WARN - module had errors, outputting error info $ModuleName" "module_wrapper"
# if the rc wasn't explicitly set, we return an exit code of 1
if ($null -eq $rc) {
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index 5113b83..c043137 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -24,10 +24,11 @@ import sys
import traceback
from jinja2.exceptions import TemplateNotFound
+from multiprocessing.queues import Queue
-from ansible.errors import AnsibleConnectionFailure
+from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.executor.task_executor import TaskExecutor
-from ansible.module_utils._text import to_text
+from ansible.module_utils.common.text.converters import to_text
from ansible.utils.display import Display
from ansible.utils.multiprocessing import context as multiprocessing_context
@@ -35,6 +36,17 @@ __all__ = ['WorkerProcess']
display = Display()
+current_worker = None
+
+
+class WorkerQueue(Queue):
+ """Queue that raises AnsibleError items on get()."""
+ def get(self, *args, **kwargs):
+ result = super(WorkerQueue, self).get(*args, **kwargs)
+ if isinstance(result, AnsibleError):
+ raise result
+ return result
+
class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defined]
'''
@@ -43,7 +55,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
for reading later.
'''
- def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
+ def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj, worker_id):
super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param:
@@ -60,6 +72,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# clear var to ensure we only delete files for this child
self._loader._tempfiles = set()
+ self.worker_queue = WorkerQueue(ctx=multiprocessing_context)
+ self.worker_id = worker_id
+
def _save_stdin(self):
self._new_stdin = None
try:
@@ -155,6 +170,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# Set the queue on Display so calls to Display.display are proxied over the queue
display.set_queue(self._final_q)
+ global current_worker
+ current_worker = self
+
try:
# execute the task and build a TaskResult from the result
display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task))
@@ -166,7 +184,8 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
self._new_stdin,
self._loader,
self._shared_loader_obj,
- self._final_q
+ self._final_q,
+ self._variable_manager,
).run()
display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid))
@@ -175,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# put the result on the result queue
display.debug("sending task result for task %s" % self._task._uuid)
- self._final_q.send_task_result(
- self._host.name,
- self._task._uuid,
- executor_result,
- task_fields=self._task.dump_attrs(),
- )
+ try:
+ self._final_q.send_task_result(
+ self._host.name,
+ self._task._uuid,
+ executor_result,
+ task_fields=self._task.dump_attrs(),
+ )
+ except Exception as e:
+ display.debug(f'failed to send task result ({e}), sending surrogate result')
+ self._final_q.send_task_result(
+ self._host.name,
+ self._task._uuid,
+ # Overriding the task result, to represent the failure
+ {
+ 'failed': True,
+ 'msg': f'{e}',
+ 'exception': traceback.format_exc(),
+ },
+ # The failure pickling may have been caused by the task attrs, omit for safety
+ {},
+ )
display.debug("done sending task result for task %s" % self._task._uuid)
except AnsibleConnectionFailure:
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 02ace8f..0e7394f 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -20,14 +20,14 @@ from ansible.executor.task_result import TaskResult
from ansible.executor.module_common import get_action_args_with_defaults
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import binary_type
-from ansible.module_utils._text import to_text, to_native
+from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.connection import write_to_file_descriptor
from ansible.playbook.conditional import Conditional
from ansible.playbook.task import Task
from ansible.plugins import get_plugin_class
from ansible.plugins.loader import become_loader, cliconf_loader, connection_loader, httpapi_loader, netconf_loader, terminal_loader
from ansible.template import Templar
-from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef
+from ansible.utils.collection_loader import AnsibleCollectionConfig
from ansible.utils.listify import listify_lookup_plugin_terms
from ansible.utils.unsafe_proxy import to_unsafe_text, wrap_var
from ansible.vars.clean import namespace_facts, clean_facts
@@ -82,7 +82,7 @@ class TaskExecutor:
class.
'''
- def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q):
+ def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q, variable_manager):
self._host = host
self._task = task
self._job_vars = job_vars
@@ -92,6 +92,7 @@ class TaskExecutor:
self._shared_loader_obj = shared_loader_obj
self._connection = None
self._final_q = final_q
+ self._variable_manager = variable_manager
self._loop_eval_error = None
self._task.squash()
@@ -136,6 +137,12 @@ class TaskExecutor:
self._task.ignore_errors = item_ignore
elif self._task.ignore_errors and not item_ignore:
self._task.ignore_errors = item_ignore
+ if 'unreachable' in item and item['unreachable']:
+ item_ignore_unreachable = item.pop('_ansible_ignore_unreachable')
+ if not res.get('unreachable'):
+ self._task.ignore_unreachable = item_ignore_unreachable
+ elif self._task.ignore_unreachable and not item_ignore_unreachable:
+ self._task.ignore_unreachable = item_ignore_unreachable
# ensure to accumulate these
for array in ['warnings', 'deprecations']:
@@ -215,21 +222,13 @@ class TaskExecutor:
templar = Templar(loader=self._loader, variables=self._job_vars)
items = None
- loop_cache = self._job_vars.get('_ansible_loop_cache')
- if loop_cache is not None:
- # _ansible_loop_cache may be set in `get_vars` when calculating `delegate_to`
- # to avoid reprocessing the loop
- items = loop_cache
- elif self._task.loop_with:
+ if self._task.loop_with:
if self._task.loop_with in self._shared_loader_obj.lookup_loader:
- fail = True
- if self._task.loop_with == 'first_found':
- # first_found loops are special. If the item is undefined then we want to fall through to the next value rather than failing.
- fail = False
+ # TODO: hardcoded so it fails for non first_found lookups, but thhis shoudl be generalized for those that don't do their own templating
+ # lookup prop/attribute?
+ fail = bool(self._task.loop_with != 'first_found')
loop_terms = listify_lookup_plugin_terms(terms=self._task.loop, templar=templar, fail_on_undefined=fail, convert_bare=False)
- if not fail:
- loop_terms = [t for t in loop_terms if not templar.is_template(t)]
# get lookup
mylookup = self._shared_loader_obj.lookup_loader.get(self._task.loop_with, loader=self._loader, templar=templar)
@@ -281,6 +280,7 @@ class TaskExecutor:
u" to something else to avoid variable collisions and unexpected behavior." % (self._task, loop_var))
ran_once = False
+ task_fields = None
no_log = False
items_len = len(items)
results = []
@@ -352,6 +352,7 @@ class TaskExecutor:
res['_ansible_item_result'] = True
res['_ansible_ignore_errors'] = task_fields.get('ignore_errors')
+ res['_ansible_ignore_unreachable'] = task_fields.get('ignore_unreachable')
# gets templated here unlike rest of loop_control fields, depends on loop_var above
try:
@@ -396,9 +397,25 @@ class TaskExecutor:
del task_vars[var]
self._task.no_log = no_log
+ # NOTE: run_once cannot contain loop vars because it's templated earlier also
+ # This is saving the post-validated field from the last loop so the strategy can use the templated value post task execution
+ self._task.run_once = task_fields.get('run_once')
+ self._task.action = task_fields.get('action')
return results
+ def _calculate_delegate_to(self, templar, variables):
+ """This method is responsible for effectively pre-validating Task.delegate_to and will
+ happen before Task.post_validate is executed
+ """
+ delegated_vars, delegated_host_name = self._variable_manager.get_delegated_vars_and_hostname(templar, self._task, variables)
+ # At the point this is executed it is safe to mutate self._task,
+ # since `self._task` is either a copy referred to by `tmp_task` in `_run_loop`
+ # or just a singular non-looped task
+ if delegated_host_name:
+ self._task.delegate_to = delegated_host_name
+ variables.update(delegated_vars)
+
def _execute(self, variables=None):
'''
The primary workhorse of the executor system, this runs the task
@@ -411,6 +428,8 @@ class TaskExecutor:
templar = Templar(loader=self._loader, variables=variables)
+ self._calculate_delegate_to(templar, variables)
+
context_validation_error = None
# a certain subset of variables exist.
@@ -450,9 +469,11 @@ class TaskExecutor:
# the fact that the conditional may specify that the task be skipped due to a
# variable not being present which would otherwise cause validation to fail
try:
- if not self._task.evaluate_conditional(templar, tempvars):
+ conditional_result, false_condition = self._task.evaluate_conditional_with_result(templar, tempvars)
+ if not conditional_result:
display.debug("when evaluation is False, skipping this task")
- return dict(changed=False, skipped=True, skip_reason='Conditional result was False', _ansible_no_log=no_log)
+ return dict(changed=False, skipped=True, skip_reason='Conditional result was False',
+ false_condition=false_condition, _ansible_no_log=no_log)
except AnsibleError as e:
# loop error takes precedence
if self._loop_eval_error is not None:
@@ -486,7 +507,7 @@ class TaskExecutor:
# if this task is a TaskInclude, we just return now with a success code so the
# main thread can expand the task list for the given host
- if self._task.action in C._ACTION_ALL_INCLUDE_TASKS:
+ if self._task.action in C._ACTION_INCLUDE_TASKS:
include_args = self._task.args.copy()
include_file = include_args.pop('_raw_params', None)
if not include_file:
@@ -570,25 +591,14 @@ class TaskExecutor:
# feed back into pc to ensure plugins not using get_option can get correct value
self._connection._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=vars_copy, templar=templar)
- # for persistent connections, initialize socket path and start connection manager
- if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)):
- self._play_context.timeout = self._connection.get_option('persistent_command_timeout')
- display.vvvv('attempting to start connection', host=self._play_context.remote_addr)
- display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr)
-
- options = self._connection.get_options()
- socket_path = start_connection(self._play_context, options, self._task._uuid)
- display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr)
- setattr(self._connection, '_socket_path', socket_path)
-
- # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules
+ # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules, right now rely on remote_is_local connection
# special handling for python interpreter for network_os, default to ansible python unless overridden
- if 'ansible_network_os' in cvars and 'ansible_python_interpreter' not in cvars:
+ if 'ansible_python_interpreter' not in cvars and 'ansible_network_os' in cvars and getattr(self._connection, '_remote_is_local', False):
# this also avoids 'python discovery'
cvars['ansible_python_interpreter'] = sys.executable
# get handler
- self._handler, module_context = self._get_action_handler_with_module_context(connection=self._connection, templar=templar)
+ self._handler, module_context = self._get_action_handler_with_module_context(templar=templar)
if module_context is not None:
module_defaults_fqcn = module_context.resolved_fqcn
@@ -606,17 +616,11 @@ class TaskExecutor:
if omit_token is not None:
self._task.args = remove_omit(self._task.args, omit_token)
- # Read some values from the task, so that we can modify them if need be
- if self._task.until:
- retries = self._task.retries
- if retries is None:
- retries = 3
- elif retries <= 0:
- retries = 1
- else:
- retries += 1
- else:
- retries = 1
+ retries = 1 # includes the default actual run + retries set by user/default
+ if self._task.retries is not None:
+ retries += max(0, self._task.retries)
+ elif self._task.until:
+ retries += 3 # the default is not set in FA because we need to differentiate "unset" value
delay = self._task.delay
if delay < 0:
@@ -722,7 +726,7 @@ class TaskExecutor:
result['failed'] = False
# Make attempts and retries available early to allow their use in changed/failed_when
- if self._task.until:
+ if retries > 1:
result['attempts'] = attempt
# set the changed property if it was missing.
@@ -754,7 +758,7 @@ class TaskExecutor:
if retries > 1:
cond = Conditional(loader=self._loader)
- cond.when = self._task.until
+ cond.when = self._task.until or [not result['failed']]
if cond.evaluate_conditional(templar, vars_copy):
break
else:
@@ -773,7 +777,7 @@ class TaskExecutor:
)
)
time.sleep(delay)
- self._handler = self._get_action_handler(connection=self._connection, templar=templar)
+ self._handler = self._get_action_handler(templar=templar)
else:
if retries > 1:
# we ran out of attempts, so mark the result as failed
@@ -1091,13 +1095,13 @@ class TaskExecutor:
return varnames
- def _get_action_handler(self, connection, templar):
+ def _get_action_handler(self, templar):
'''
Returns the correct action plugin to handle the requestion task action
'''
- return self._get_action_handler_with_module_context(connection, templar)[0]
+ return self._get_action_handler_with_module_context(templar)[0]
- def _get_action_handler_with_module_context(self, connection, templar):
+ def _get_action_handler_with_module_context(self, templar):
'''
Returns the correct action plugin to handle the requestion task action and the module context
'''
@@ -1134,10 +1138,29 @@ class TaskExecutor:
handler_name = 'ansible.legacy.normal'
collections = None # until then, we don't want the task's collection list to be consulted; use the builtin
+ # networking/psersistent connections handling
+ if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)):
+
+ # check handler in case we dont need to do all the work to setup persistent connection
+ handler_class = self._shared_loader_obj.action_loader.get(handler_name, class_only=True)
+ if getattr(handler_class, '_requires_connection', True):
+ # for persistent connections, initialize socket path and start connection manager
+ self._play_context.timeout = self._connection.get_option('persistent_command_timeout')
+ display.vvvv('attempting to start connection', host=self._play_context.remote_addr)
+ display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr)
+
+ options = self._connection.get_options()
+ socket_path = start_connection(self._play_context, options, self._task._uuid)
+ display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr)
+ setattr(self._connection, '_socket_path', socket_path)
+ else:
+ # TODO: set self._connection to dummy/noop connection, using local for now
+ self._connection = self._get_connection({}, templar, 'local')
+
handler = self._shared_loader_obj.action_loader.get(
handler_name,
task=self._task,
- connection=connection,
+ connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=templar,
@@ -1213,8 +1236,7 @@ def start_connection(play_context, options, task_uuid):
else:
try:
result = json.loads(to_text(stderr, errors='surrogate_then_replace'))
- except getattr(json.decoder, 'JSONDecodeError', ValueError):
- # JSONDecodeError only available on Python 3.5+
+ except json.decoder.JSONDecodeError:
result = {'error': to_text(stderr, errors='surrogate_then_replace')}
if 'messages' in result:
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index dcfc38a..3bbf3d5 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -24,6 +24,7 @@ import sys
import tempfile
import threading
import time
+import typing as t
import multiprocessing.queues
from ansible import constants as C
@@ -33,7 +34,7 @@ from ansible.executor.play_iterator import PlayIterator
from ansible.executor.stats import AggregateStats
from ansible.executor.task_result import TaskResult
from ansible.module_utils.six import string_types
-from ansible.module_utils._text import to_text, to_native
+from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.playbook.play_context import PlayContext
from ansible.playbook.task import Task
from ansible.plugins.loader import callback_loader, strategy_loader, module_loader
@@ -45,6 +46,7 @@ from ansible.utils.display import Display
from ansible.utils.lock import lock_decorator
from ansible.utils.multiprocessing import context as multiprocessing_context
+from dataclasses import dataclass
__all__ = ['TaskQueueManager']
@@ -59,20 +61,30 @@ class CallbackSend:
class DisplaySend:
- def __init__(self, *args, **kwargs):
+ def __init__(self, method, *args, **kwargs):
+ self.method = method
self.args = args
self.kwargs = kwargs
-class FinalQueue(multiprocessing.queues.Queue):
+@dataclass
+class PromptSend:
+ worker_id: int
+ prompt: str
+ private: bool = True
+ seconds: int = None
+ interrupt_input: t.Iterable[bytes] = None
+ complete_input: t.Iterable[bytes] = None
+
+
+class FinalQueue(multiprocessing.queues.SimpleQueue):
def __init__(self, *args, **kwargs):
kwargs['ctx'] = multiprocessing_context
- super(FinalQueue, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def send_callback(self, method_name, *args, **kwargs):
self.put(
CallbackSend(method_name, *args, **kwargs),
- block=False
)
def send_task_result(self, *args, **kwargs):
@@ -82,13 +94,16 @@ class FinalQueue(multiprocessing.queues.Queue):
tr = TaskResult(*args, **kwargs)
self.put(
tr,
- block=False
)
- def send_display(self, *args, **kwargs):
+ def send_display(self, method, *args, **kwargs):
+ self.put(
+ DisplaySend(method, *args, **kwargs),
+ )
+
+ def send_prompt(self, **kwargs):
self.put(
- DisplaySend(*args, **kwargs),
- block=False
+ PromptSend(**kwargs),
)
@@ -217,7 +232,7 @@ class TaskQueueManager:
callback_name = cnames[0]
else:
# fallback to 'old loader name'
- (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
+ (callback_name, ext) = os.path.splitext(os.path.basename(callback_plugin._original_path))
display.vvvvv("Attempting to use '%s' callback." % (callback_name))
if callback_type == 'stdout':