summaryrefslogtreecommitdiffstats
path: root/lib/ansible/executor/task_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/task_executor.py')
-rw-r--r--lib/ansible/executor/task_executor.py1239
1 files changed, 1239 insertions, 0 deletions
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
new file mode 100644
index 0000000..02ace8f
--- /dev/null
+++ b/lib/ansible/executor/task_executor.py
@@ -0,0 +1,1239 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+# (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import pty
+import time
+import json
+import signal
+import subprocess
+import sys
+import termios
+import traceback
+
+from ansible import constants as C
+from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable, AnsibleConnectionFailure, AnsibleActionFail, AnsibleActionSkip
+from ansible.executor.task_result import TaskResult
+from ansible.executor.module_common import get_action_args_with_defaults
+from ansible.module_utils.parsing.convert_bool import boolean
+from ansible.module_utils.six import binary_type
+from ansible.module_utils._text import to_text, to_native
+from ansible.module_utils.connection import write_to_file_descriptor
+from ansible.playbook.conditional import Conditional
+from ansible.playbook.task import Task
+from ansible.plugins import get_plugin_class
+from ansible.plugins.loader import become_loader, cliconf_loader, connection_loader, httpapi_loader, netconf_loader, terminal_loader
+from ansible.template import Templar
+from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef
+from ansible.utils.listify import listify_lookup_plugin_terms
+from ansible.utils.unsafe_proxy import to_unsafe_text, wrap_var
+from ansible.vars.clean import namespace_facts, clean_facts
+from ansible.utils.display import Display
+from ansible.utils.vars import combine_vars, isidentifier
+
+display = Display()
+
+
+RETURN_VARS = [x for x in C.MAGIC_VARIABLE_MAPPING.items() if 'become' not in x and '_pass' not in x]
+
+__all__ = ['TaskExecutor']
+
+
+class TaskTimeoutError(BaseException):
+ pass
+
+
+def task_timeout(signum, frame):
+ raise TaskTimeoutError
+
+
+def remove_omit(task_args, omit_token):
+ '''
+ Remove args with a value equal to the ``omit_token`` recursively
+ to align with now having suboptions in the argument_spec
+ '''
+
+ if not isinstance(task_args, dict):
+ return task_args
+
+ new_args = {}
+ for i in task_args.items():
+ if i[1] == omit_token:
+ continue
+ elif isinstance(i[1], dict):
+ new_args[i[0]] = remove_omit(i[1], omit_token)
+ elif isinstance(i[1], list):
+ new_args[i[0]] = [remove_omit(v, omit_token) for v in i[1]]
+ else:
+ new_args[i[0]] = i[1]
+
+ return new_args
+
+
+class TaskExecutor:
+
+ '''
+ This is the main worker class for the executor pipeline, which
+ handles loading an action plugin to actually dispatch the task to
+ a given host. This class roughly corresponds to the old Runner()
+ class.
+ '''
+
+ def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q):
+ self._host = host
+ self._task = task
+ self._job_vars = job_vars
+ self._play_context = play_context
+ self._new_stdin = new_stdin
+ self._loader = loader
+ self._shared_loader_obj = shared_loader_obj
+ self._connection = None
+ self._final_q = final_q
+ self._loop_eval_error = None
+
+ self._task.squash()
+
+ def run(self):
+ '''
+ The main executor entrypoint, where we determine if the specified
+ task requires looping and either runs the task with self._run_loop()
+ or self._execute(). After that, the returned results are parsed and
+ returned as a dict.
+ '''
+
+ display.debug("in run() - task %s" % self._task._uuid)
+
+ try:
+ try:
+ items = self._get_loop_items()
+ except AnsibleUndefinedVariable as e:
+ # save the error raised here for use later
+ items = None
+ self._loop_eval_error = e
+
+ if items is not None:
+ if len(items) > 0:
+ item_results = self._run_loop(items)
+
+ # create the overall result item
+ res = dict(results=item_results)
+
+ # loop through the item results and set the global changed/failed/skipped result flags based on any item.
+ res['skipped'] = True
+ for item in item_results:
+ if 'changed' in item and item['changed'] and not res.get('changed'):
+ res['changed'] = True
+ if res['skipped'] and ('skipped' not in item or ('skipped' in item and not item['skipped'])):
+ res['skipped'] = False
+ if 'failed' in item and item['failed']:
+ item_ignore = item.pop('_ansible_ignore_errors')
+ if not res.get('failed'):
+ res['failed'] = True
+ res['msg'] = 'One or more items failed'
+ self._task.ignore_errors = item_ignore
+ elif self._task.ignore_errors and not item_ignore:
+ self._task.ignore_errors = item_ignore
+
+ # ensure to accumulate these
+ for array in ['warnings', 'deprecations']:
+ if array in item and item[array]:
+ if array not in res:
+ res[array] = []
+ if not isinstance(item[array], list):
+ item[array] = [item[array]]
+ res[array] = res[array] + item[array]
+ del item[array]
+
+ if not res.get('failed', False):
+ res['msg'] = 'All items completed'
+ if res['skipped']:
+ res['msg'] = 'All items skipped'
+ else:
+ res = dict(changed=False, skipped=True, skipped_reason='No items in the list', results=[])
+ else:
+ display.debug("calling self._execute()")
+ res = self._execute()
+ display.debug("_execute() done")
+
+ # make sure changed is set in the result, if it's not present
+ if 'changed' not in res:
+ res['changed'] = False
+
+ def _clean_res(res, errors='surrogate_or_strict'):
+ if isinstance(res, binary_type):
+ return to_unsafe_text(res, errors=errors)
+ elif isinstance(res, dict):
+ for k in res:
+ try:
+ res[k] = _clean_res(res[k], errors=errors)
+ except UnicodeError:
+ if k == 'diff':
+ # If this is a diff, substitute a replacement character if the value
+ # is undecodable as utf8. (Fix #21804)
+ display.warning("We were unable to decode all characters in the module return data."
+ " Replaced some in an effort to return as much as possible")
+ res[k] = _clean_res(res[k], errors='surrogate_then_replace')
+ else:
+ raise
+ elif isinstance(res, list):
+ for idx, item in enumerate(res):
+ res[idx] = _clean_res(item, errors=errors)
+ return res
+
+ display.debug("dumping result to json")
+ res = _clean_res(res)
+ display.debug("done dumping result, returning")
+ return res
+ except AnsibleError as e:
+ return dict(failed=True, msg=wrap_var(to_text(e, nonstring='simplerepr')), _ansible_no_log=self._play_context.no_log)
+ except Exception as e:
+ return dict(failed=True, msg=wrap_var('Unexpected failure during module execution: %s' % (to_native(e, nonstring='simplerepr'))),
+ exception=to_text(traceback.format_exc()), stdout='', _ansible_no_log=self._play_context.no_log)
+ finally:
+ try:
+ self._connection.close()
+ except AttributeError:
+ pass
+ except Exception as e:
+ display.debug(u"error closing connection: %s" % to_text(e))
+
+ def _get_loop_items(self):
+ '''
+ Loads a lookup plugin to handle the with_* portion of a task (if specified),
+ and returns the items result.
+ '''
+
+ # get search path for this task to pass to lookup plugins
+ self._job_vars['ansible_search_path'] = self._task.get_search_path()
+
+ # ensure basedir is always in (dwim already searches here but we need to display it)
+ if self._loader.get_basedir() not in self._job_vars['ansible_search_path']:
+ self._job_vars['ansible_search_path'].append(self._loader.get_basedir())
+
+ templar = Templar(loader=self._loader, variables=self._job_vars)
+ items = None
+ loop_cache = self._job_vars.get('_ansible_loop_cache')
+ if loop_cache is not None:
+ # _ansible_loop_cache may be set in `get_vars` when calculating `delegate_to`
+ # to avoid reprocessing the loop
+ items = loop_cache
+ elif self._task.loop_with:
+ if self._task.loop_with in self._shared_loader_obj.lookup_loader:
+ fail = True
+ if self._task.loop_with == 'first_found':
+ # first_found loops are special. If the item is undefined then we want to fall through to the next value rather than failing.
+ fail = False
+
+ loop_terms = listify_lookup_plugin_terms(terms=self._task.loop, templar=templar, fail_on_undefined=fail, convert_bare=False)
+ if not fail:
+ loop_terms = [t for t in loop_terms if not templar.is_template(t)]
+
+ # get lookup
+ mylookup = self._shared_loader_obj.lookup_loader.get(self._task.loop_with, loader=self._loader, templar=templar)
+
+ # give lookup task 'context' for subdir (mostly needed for first_found)
+ for subdir in ['template', 'var', 'file']: # TODO: move this to constants?
+ if subdir in self._task.action:
+ break
+ setattr(mylookup, '_subdir', subdir + 's')
+
+ # run lookup
+ items = wrap_var(mylookup.run(terms=loop_terms, variables=self._job_vars, wantlist=True))
+ else:
+ raise AnsibleError("Unexpected failure in finding the lookup named '%s' in the available lookup plugins" % self._task.loop_with)
+
+ elif self._task.loop is not None:
+ items = templar.template(self._task.loop)
+ if not isinstance(items, list):
+ raise AnsibleError(
+ "Invalid data passed to 'loop', it requires a list, got this instead: %s."
+ " Hint: If you passed a list/dict of just one element,"
+ " try adding wantlist=True to your lookup invocation or use q/query instead of lookup." % items
+ )
+
+ return items
+
+ def _run_loop(self, items):
+ '''
+ Runs the task with the loop items specified and collates the result
+ into an array named 'results' which is inserted into the final result
+ along with the item for which the loop ran.
+ '''
+ task_vars = self._job_vars
+ templar = Templar(loader=self._loader, variables=task_vars)
+
+ self._task.loop_control.post_validate(templar=templar)
+
+ loop_var = self._task.loop_control.loop_var
+ index_var = self._task.loop_control.index_var
+ loop_pause = self._task.loop_control.pause
+ extended = self._task.loop_control.extended
+ extended_allitems = self._task.loop_control.extended_allitems
+ # ensure we always have a label
+ label = self._task.loop_control.label or '{{' + loop_var + '}}'
+
+ if loop_var in task_vars:
+ display.warning(u"%s: The loop variable '%s' is already in use. "
+ u"You should set the `loop_var` value in the `loop_control` option for the task"
+ u" to something else to avoid variable collisions and unexpected behavior." % (self._task, loop_var))
+
+ ran_once = False
+ no_log = False
+ items_len = len(items)
+ results = []
+ for item_index, item in enumerate(items):
+ task_vars['ansible_loop_var'] = loop_var
+
+ task_vars[loop_var] = item
+ if index_var:
+ task_vars['ansible_index_var'] = index_var
+ task_vars[index_var] = item_index
+
+ if extended:
+ task_vars['ansible_loop'] = {
+ 'index': item_index + 1,
+ 'index0': item_index,
+ 'first': item_index == 0,
+ 'last': item_index + 1 == items_len,
+ 'length': items_len,
+ 'revindex': items_len - item_index,
+ 'revindex0': items_len - item_index - 1,
+ }
+ if extended_allitems:
+ task_vars['ansible_loop']['allitems'] = items
+ try:
+ task_vars['ansible_loop']['nextitem'] = items[item_index + 1]
+ except IndexError:
+ pass
+ if item_index - 1 >= 0:
+ task_vars['ansible_loop']['previtem'] = items[item_index - 1]
+
+ # Update template vars to reflect current loop iteration
+ templar.available_variables = task_vars
+
+ # pause between loop iterations
+ if loop_pause and ran_once:
+ time.sleep(loop_pause)
+ else:
+ ran_once = True
+
+ try:
+ tmp_task = self._task.copy(exclude_parent=True, exclude_tasks=True)
+ tmp_task._parent = self._task._parent
+ tmp_play_context = self._play_context.copy()
+ except AnsibleParserError as e:
+ results.append(dict(failed=True, msg=to_text(e)))
+ continue
+
+ # now we swap the internal task and play context with their copies,
+ # execute, and swap them back so we can do the next iteration cleanly
+ (self._task, tmp_task) = (tmp_task, self._task)
+ (self._play_context, tmp_play_context) = (tmp_play_context, self._play_context)
+ res = self._execute(variables=task_vars)
+ task_fields = self._task.dump_attrs()
+ (self._task, tmp_task) = (tmp_task, self._task)
+ (self._play_context, tmp_play_context) = (tmp_play_context, self._play_context)
+
+ # update 'general no_log' based on specific no_log
+ no_log = no_log or tmp_task.no_log
+
+ # now update the result with the item info, and append the result
+ # to the list of results
+ res[loop_var] = item
+ res['ansible_loop_var'] = loop_var
+ if index_var:
+ res[index_var] = item_index
+ res['ansible_index_var'] = index_var
+ if extended:
+ res['ansible_loop'] = task_vars['ansible_loop']
+
+ res['_ansible_item_result'] = True
+ res['_ansible_ignore_errors'] = task_fields.get('ignore_errors')
+
+ # gets templated here unlike rest of loop_control fields, depends on loop_var above
+ try:
+ res['_ansible_item_label'] = templar.template(label)
+ except AnsibleUndefinedVariable as e:
+ res.update({
+ 'failed': True,
+ 'msg': 'Failed to template loop_control.label: %s' % to_text(e)
+ })
+
+ tr = TaskResult(
+ self._host.name,
+ self._task._uuid,
+ res,
+ task_fields=task_fields,
+ )
+ if tr.is_failed() or tr.is_unreachable():
+ self._final_q.send_callback('v2_runner_item_on_failed', tr)
+ elif tr.is_skipped():
+ self._final_q.send_callback('v2_runner_item_on_skipped', tr)
+ else:
+ if getattr(self._task, 'diff', False):
+ self._final_q.send_callback('v2_on_file_diff', tr)
+ if self._task.action not in C._ACTION_INVENTORY_TASKS:
+ self._final_q.send_callback('v2_runner_item_on_ok', tr)
+
+ results.append(res)
+ del task_vars[loop_var]
+
+ # clear 'connection related' plugin variables for next iteration
+ if self._connection:
+ clear_plugins = {
+ 'connection': self._connection._load_name,
+ 'shell': self._connection._shell._load_name
+ }
+ if self._connection.become:
+ clear_plugins['become'] = self._connection.become._load_name
+
+ for plugin_type, plugin_name in clear_plugins.items():
+ for var in C.config.get_plugin_vars(plugin_type, plugin_name):
+ if var in task_vars and var not in self._job_vars:
+ del task_vars[var]
+
+ self._task.no_log = no_log
+
+ return results
+
+ def _execute(self, variables=None):
+ '''
+ The primary workhorse of the executor system, this runs the task
+ on the specified host (which may be the delegated_to host) and handles
+ the retry/until and block rescue/always execution
+ '''
+
+ if variables is None:
+ variables = self._job_vars
+
+ templar = Templar(loader=self._loader, variables=variables)
+
+ context_validation_error = None
+
+ # a certain subset of variables exist.
+ tempvars = variables.copy()
+
+ try:
+ # TODO: remove play_context as this does not take delegation nor loops correctly into account,
+ # the task itself should hold the correct values for connection/shell/become/terminal plugin options to finalize.
+ # Kept for now for backwards compatibility and a few functions that are still exclusive to it.
+
+ # apply the given task's information to the connection info,
+ # which may override some fields already set by the play or
+ # the options specified on the command line
+ self._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=variables, templar=templar)
+
+ # fields set from the play/task may be based on variables, so we have to
+ # do the same kind of post validation step on it here before we use it.
+ self._play_context.post_validate(templar=templar)
+
+ # now that the play context is finalized, if the remote_addr is not set
+ # default to using the host's address field as the remote address
+ if not self._play_context.remote_addr:
+ self._play_context.remote_addr = self._host.address
+
+ # We also add "magic" variables back into the variables dict to make sure
+ self._play_context.update_vars(tempvars)
+
+ except AnsibleError as e:
+ # save the error, which we'll raise later if we don't end up
+ # skipping this task during the conditional evaluation step
+ context_validation_error = e
+
+ no_log = self._play_context.no_log
+
+ # Evaluate the conditional (if any) for this task, which we do before running
+ # the final task post-validation. We do this before the post validation due to
+ # the fact that the conditional may specify that the task be skipped due to a
+ # variable not being present which would otherwise cause validation to fail
+ try:
+ if not self._task.evaluate_conditional(templar, tempvars):
+ display.debug("when evaluation is False, skipping this task")
+ return dict(changed=False, skipped=True, skip_reason='Conditional result was False', _ansible_no_log=no_log)
+ except AnsibleError as e:
+ # loop error takes precedence
+ if self._loop_eval_error is not None:
+ # Display the error from the conditional as well to prevent
+ # losing information useful for debugging.
+ display.v(to_text(e))
+ raise self._loop_eval_error # pylint: disable=raising-bad-type
+ raise
+
+ # Not skipping, if we had loop error raised earlier we need to raise it now to halt the execution of this task
+ if self._loop_eval_error is not None:
+ raise self._loop_eval_error # pylint: disable=raising-bad-type
+
+ # if we ran into an error while setting up the PlayContext, raise it now, unless is known issue with delegation
+ # and undefined vars (correct values are in cvars later on and connection plugins, if still error, blows up there)
+ if context_validation_error is not None:
+ raiseit = True
+ if self._task.delegate_to:
+ if isinstance(context_validation_error, AnsibleUndefinedVariable):
+ raiseit = False
+ elif isinstance(context_validation_error, AnsibleParserError):
+ # parser error, might be cause by undef too
+ orig_exc = getattr(context_validation_error, 'orig_exc', None)
+ if isinstance(orig_exc, AnsibleUndefinedVariable):
+ raiseit = False
+ if raiseit:
+ raise context_validation_error # pylint: disable=raising-bad-type
+
+ # set templar to use temp variables until loop is evaluated
+ templar.available_variables = tempvars
+
+ # if this task is a TaskInclude, we just return now with a success code so the
+ # main thread can expand the task list for the given host
+ if self._task.action in C._ACTION_ALL_INCLUDE_TASKS:
+ include_args = self._task.args.copy()
+ include_file = include_args.pop('_raw_params', None)
+ if not include_file:
+ return dict(failed=True, msg="No include file was specified to the include")
+
+ include_file = templar.template(include_file)
+ return dict(include=include_file, include_args=include_args)
+
+ # if this task is a IncludeRole, we just return now with a success code so the main thread can expand the task list for the given host
+ elif self._task.action in C._ACTION_INCLUDE_ROLE:
+ include_args = self._task.args.copy()
+ return dict(include_args=include_args)
+
+ # Now we do final validation on the task, which sets all fields to their final values.
+ try:
+ self._task.post_validate(templar=templar)
+ except AnsibleError:
+ raise
+ except Exception:
+ return dict(changed=False, failed=True, _ansible_no_log=no_log, exception=to_text(traceback.format_exc()))
+ if '_variable_params' in self._task.args:
+ variable_params = self._task.args.pop('_variable_params')
+ if isinstance(variable_params, dict):
+ if C.INJECT_FACTS_AS_VARS:
+ display.warning("Using a variable for a task's 'args' is unsafe in some situations "
+ "(see https://docs.ansible.com/ansible/devel/reference_appendices/faq.html#argsplat-unsafe)")
+ variable_params.update(self._task.args)
+ self._task.args = variable_params
+ else:
+ # if we didn't get a dict, it means there's garbage remaining after k=v parsing, just give up
+ # see https://github.com/ansible/ansible/issues/79862
+ raise AnsibleError(f"invalid or malformed argument: '{variable_params}'")
+
+ # update no_log to task value, now that we have it templated
+ no_log = self._task.no_log
+
+ # free tempvars up, not used anymore, cvars and vars_copy should be mainly used after this point
+ # updating the original 'variables' at the end
+ tempvars = {}
+
+ # setup cvars copy, used for all connection related templating
+ if self._task.delegate_to:
+ # use vars from delegated host (which already include task vars) instead of original host
+ cvars = variables.get('ansible_delegated_vars', {}).get(self._task.delegate_to, {})
+ else:
+ # just use normal host vars
+ cvars = variables
+
+ templar.available_variables = cvars
+
+ # use magic var if it exists, if not, let task inheritance do it's thing.
+ if cvars.get('ansible_connection') is not None:
+ current_connection = templar.template(cvars['ansible_connection'])
+ else:
+ current_connection = self._task.connection
+
+ # get the connection and the handler for this execution
+ if (not self._connection or
+ not getattr(self._connection, 'connected', False) or
+ not self._connection.matches_name([current_connection]) or
+ # pc compare, left here for old plugins, but should be irrelevant for those
+ # using get_option, since they are cleared each iteration.
+ self._play_context.remote_addr != self._connection._play_context.remote_addr):
+ self._connection = self._get_connection(cvars, templar, current_connection)
+ else:
+ # if connection is reused, its _play_context is no longer valid and needs
+ # to be replaced with the one templated above, in case other data changed
+ self._connection._play_context = self._play_context
+ self._set_become_plugin(cvars, templar, self._connection)
+
+ plugin_vars = self._set_connection_options(cvars, templar)
+
+ # make a copy of the job vars here, as we update them here and later,
+ # but don't want to pollute original
+ vars_copy = variables.copy()
+ # update with connection info (i.e ansible_host/ansible_user)
+ self._connection.update_vars(vars_copy)
+ templar.available_variables = vars_copy
+
+ # TODO: eventually remove as pc is taken out of the resolution path
+ # feed back into pc to ensure plugins not using get_option can get correct value
+ self._connection._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=vars_copy, templar=templar)
+
+ # for persistent connections, initialize socket path and start connection manager
+ if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)):
+ self._play_context.timeout = self._connection.get_option('persistent_command_timeout')
+ display.vvvv('attempting to start connection', host=self._play_context.remote_addr)
+ display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr)
+
+ options = self._connection.get_options()
+ socket_path = start_connection(self._play_context, options, self._task._uuid)
+ display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr)
+ setattr(self._connection, '_socket_path', socket_path)
+
+ # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules
+ # special handling for python interpreter for network_os, default to ansible python unless overridden
+ if 'ansible_network_os' in cvars and 'ansible_python_interpreter' not in cvars:
+ # this also avoids 'python discovery'
+ cvars['ansible_python_interpreter'] = sys.executable
+
+ # get handler
+ self._handler, module_context = self._get_action_handler_with_module_context(connection=self._connection, templar=templar)
+
+ if module_context is not None:
+ module_defaults_fqcn = module_context.resolved_fqcn
+ else:
+ module_defaults_fqcn = self._task.resolved_action
+
+ # Apply default params for action/module, if present
+ self._task.args = get_action_args_with_defaults(
+ module_defaults_fqcn, self._task.args, self._task.module_defaults, templar,
+ action_groups=self._task._parent._play._action_groups
+ )
+
+ # And filter out any fields which were set to default(omit), and got the omit token value
+ omit_token = variables.get('omit')
+ if omit_token is not None:
+ self._task.args = remove_omit(self._task.args, omit_token)
+
+ # Read some values from the task, so that we can modify them if need be
+ if self._task.until:
+ retries = self._task.retries
+ if retries is None:
+ retries = 3
+ elif retries <= 0:
+ retries = 1
+ else:
+ retries += 1
+ else:
+ retries = 1
+
+ delay = self._task.delay
+ if delay < 0:
+ delay = 1
+
+ display.debug("starting attempt loop")
+ result = None
+ for attempt in range(1, retries + 1):
+ display.debug("running the handler")
+ try:
+ if self._task.timeout:
+ old_sig = signal.signal(signal.SIGALRM, task_timeout)
+ signal.alarm(self._task.timeout)
+ result = self._handler.run(task_vars=vars_copy)
+ except (AnsibleActionFail, AnsibleActionSkip) as e:
+ return e.result
+ except AnsibleConnectionFailure as e:
+ return dict(unreachable=True, msg=to_text(e))
+ except TaskTimeoutError as e:
+ msg = 'The %s action failed to execute in the expected time frame (%d) and was terminated' % (self._task.action, self._task.timeout)
+ return dict(failed=True, msg=msg)
+ finally:
+ if self._task.timeout:
+ signal.alarm(0)
+ old_sig = signal.signal(signal.SIGALRM, old_sig)
+ self._handler.cleanup()
+ display.debug("handler run complete")
+
+ # preserve no log
+ result["_ansible_no_log"] = no_log
+
+ if self._task.action not in C._ACTION_WITH_CLEAN_FACTS:
+ result = wrap_var(result)
+
+ # update the local copy of vars with the registered value, if specified,
+ # or any facts which may have been generated by the module execution
+ if self._task.register:
+ if not isidentifier(self._task.register):
+ raise AnsibleError("Invalid variable name in 'register' specified: '%s'" % self._task.register)
+
+ vars_copy[self._task.register] = result
+
+ if self._task.async_val > 0:
+ if self._task.poll > 0 and not result.get('skipped') and not result.get('failed'):
+ result = self._poll_async_result(result=result, templar=templar, task_vars=vars_copy)
+ if result.get('failed'):
+ self._final_q.send_callback(
+ 'v2_runner_on_async_failed',
+ TaskResult(self._host.name,
+ self._task._uuid,
+ result,
+ task_fields=self._task.dump_attrs()))
+ else:
+ self._final_q.send_callback(
+ 'v2_runner_on_async_ok',
+ TaskResult(self._host.name,
+ self._task._uuid,
+ result,
+ task_fields=self._task.dump_attrs()))
+
+ # ensure no log is preserved
+ result["_ansible_no_log"] = no_log
+
+ # helper methods for use below in evaluating changed/failed_when
+ def _evaluate_changed_when_result(result):
+ if self._task.changed_when is not None and self._task.changed_when:
+ cond = Conditional(loader=self._loader)
+ cond.when = self._task.changed_when
+ result['changed'] = cond.evaluate_conditional(templar, vars_copy)
+
+ def _evaluate_failed_when_result(result):
+ if self._task.failed_when:
+ cond = Conditional(loader=self._loader)
+ cond.when = self._task.failed_when
+ failed_when_result = cond.evaluate_conditional(templar, vars_copy)
+ result['failed_when_result'] = result['failed'] = failed_when_result
+ else:
+ failed_when_result = False
+ return failed_when_result
+
+ if 'ansible_facts' in result and self._task.action not in C._ACTION_DEBUG:
+ if self._task.action in C._ACTION_WITH_CLEAN_FACTS:
+ if self._task.delegate_to and self._task.delegate_facts:
+ if '_ansible_delegated_vars' in vars_copy:
+ vars_copy['_ansible_delegated_vars'].update(result['ansible_facts'])
+ else:
+ vars_copy['_ansible_delegated_vars'] = result['ansible_facts']
+ else:
+ vars_copy.update(result['ansible_facts'])
+ else:
+ # TODO: cleaning of facts should eventually become part of taskresults instead of vars
+ af = wrap_var(result['ansible_facts'])
+ vars_copy['ansible_facts'] = combine_vars(vars_copy.get('ansible_facts', {}), namespace_facts(af))
+ if C.INJECT_FACTS_AS_VARS:
+ vars_copy.update(clean_facts(af))
+
+ # set the failed property if it was missing.
+ if 'failed' not in result:
+ # rc is here for backwards compatibility and modules that use it instead of 'failed'
+ if 'rc' in result and result['rc'] not in [0, "0"]:
+ result['failed'] = True
+ else:
+ result['failed'] = False
+
+ # Make attempts and retries available early to allow their use in changed/failed_when
+ if self._task.until:
+ result['attempts'] = attempt
+
+ # set the changed property if it was missing.
+ if 'changed' not in result:
+ result['changed'] = False
+
+ if self._task.action not in C._ACTION_WITH_CLEAN_FACTS:
+ result = wrap_var(result)
+
+ # re-update the local copy of vars with the registered value, if specified,
+ # or any facts which may have been generated by the module execution
+ # This gives changed/failed_when access to additional recently modified
+ # attributes of result
+ if self._task.register:
+ vars_copy[self._task.register] = result
+
+ # if we didn't skip this task, use the helpers to evaluate the changed/
+ # failed_when properties
+ if 'skipped' not in result:
+ condname = 'changed'
+
+ try:
+ _evaluate_changed_when_result(result)
+ condname = 'failed'
+ _evaluate_failed_when_result(result)
+ except AnsibleError as e:
+ result['failed'] = True
+ result['%s_when_result' % condname] = to_text(e)
+
+ if retries > 1:
+ cond = Conditional(loader=self._loader)
+ cond.when = self._task.until
+ if cond.evaluate_conditional(templar, vars_copy):
+ break
+ else:
+ # no conditional check, or it failed, so sleep for the specified time
+ if attempt < retries:
+ result['_ansible_retry'] = True
+ result['retries'] = retries
+ display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
+ self._final_q.send_callback(
+ 'v2_runner_retry',
+ TaskResult(
+ self._host.name,
+ self._task._uuid,
+ result,
+ task_fields=self._task.dump_attrs()
+ )
+ )
+ time.sleep(delay)
+ self._handler = self._get_action_handler(connection=self._connection, templar=templar)
+ else:
+ if retries > 1:
+ # we ran out of attempts, so mark the result as failed
+ result['attempts'] = retries - 1
+ result['failed'] = True
+
+ if self._task.action not in C._ACTION_WITH_CLEAN_FACTS:
+ result = wrap_var(result)
+
+ # do the final update of the local variables here, for both registered
+ # values and any facts which may have been created
+ if self._task.register:
+ variables[self._task.register] = result
+
+ if 'ansible_facts' in result and self._task.action not in C._ACTION_DEBUG:
+ if self._task.action in C._ACTION_WITH_CLEAN_FACTS:
+ variables.update(result['ansible_facts'])
+ else:
+ # TODO: cleaning of facts should eventually become part of taskresults instead of vars
+ af = wrap_var(result['ansible_facts'])
+ variables['ansible_facts'] = combine_vars(variables.get('ansible_facts', {}), namespace_facts(af))
+ if C.INJECT_FACTS_AS_VARS:
+ variables.update(clean_facts(af))
+
+ # save the notification target in the result, if it was specified, as
+ # this task may be running in a loop in which case the notification
+ # may be item-specific, ie. "notify: service {{item}}"
+ if self._task.notify is not None:
+ result['_ansible_notify'] = self._task.notify
+
+ # add the delegated vars to the result, so we can reference them
+ # on the results side without having to do any further templating
+ # also now add connection vars results when delegating
+ if self._task.delegate_to:
+ result["_ansible_delegated_vars"] = {'ansible_delegated_host': self._task.delegate_to}
+ for k in plugin_vars:
+ result["_ansible_delegated_vars"][k] = cvars.get(k)
+
+ # note: here for callbacks that rely on this info to display delegation
+ for requireshed in ('ansible_host', 'ansible_port', 'ansible_user', 'ansible_connection'):
+ if requireshed not in result["_ansible_delegated_vars"] and requireshed in cvars:
+ result["_ansible_delegated_vars"][requireshed] = cvars.get(requireshed)
+
+ # and return
+ display.debug("attempt loop complete, returning result")
+ return result
+
+ def _poll_async_result(self, result, templar, task_vars=None):
+ '''
+ Polls for the specified JID to be complete
+ '''
+
+ if task_vars is None:
+ task_vars = self._job_vars
+
+ async_jid = result.get('ansible_job_id')
+ if async_jid is None:
+ return dict(failed=True, msg="No job id was returned by the async task")
+
+ # Create a new pseudo-task to run the async_status module, and run
+ # that (with a sleep for "poll" seconds between each retry) until the
+ # async time limit is exceeded.
+
+ async_task = Task.load(dict(action='async_status', args={'jid': async_jid}, environment=self._task.environment))
+
+ # FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized
+ # Because this is an async task, the action handler is async. However,
+ # we need the 'normal' action handler for the status check, so get it
+ # now via the action_loader
+ async_handler = self._shared_loader_obj.action_loader.get(
+ 'ansible.legacy.async_status',
+ task=async_task,
+ connection=self._connection,
+ play_context=self._play_context,
+ loader=self._loader,
+ templar=templar,
+ shared_loader_obj=self._shared_loader_obj,
+ )
+
+ time_left = self._task.async_val
+ while time_left > 0:
+ time.sleep(self._task.poll)
+
+ try:
+ async_result = async_handler.run(task_vars=task_vars)
+ # We do not bail out of the loop in cases where the failure
+ # is associated with a parsing error. The async_runner can
+ # have issues which result in a half-written/unparseable result
+ # file on disk, which manifests to the user as a timeout happening
+ # before it's time to timeout.
+ if (int(async_result.get('finished', 0)) == 1 or
+ ('failed' in async_result and async_result.get('_ansible_parsed', False)) or
+ 'skipped' in async_result):
+ break
+ except Exception as e:
+ # Connections can raise exceptions during polling (eg, network bounce, reboot); these should be non-fatal.
+ # On an exception, call the connection's reset method if it has one
+ # (eg, drop/recreate WinRM connection; some reused connections are in a broken state)
+ display.vvvv("Exception during async poll, retrying... (%s)" % to_text(e))
+ display.debug("Async poll exception was:\n%s" % to_text(traceback.format_exc()))
+ try:
+ async_handler._connection.reset()
+ except AttributeError:
+ pass
+
+ # Little hack to raise the exception if we've exhausted the timeout period
+ time_left -= self._task.poll
+ if time_left <= 0:
+ raise
+ else:
+ time_left -= self._task.poll
+ self._final_q.send_callback(
+ 'v2_runner_on_async_poll',
+ TaskResult(
+ self._host.name,
+ async_task._uuid,
+ async_result,
+ task_fields=async_task.dump_attrs(),
+ ),
+ )
+
+ if int(async_result.get('finished', 0)) != 1:
+ if async_result.get('_ansible_parsed'):
+ return dict(failed=True, msg="async task did not complete within the requested time - %ss" % self._task.async_val, async_result=async_result)
+ else:
+ return dict(failed=True, msg="async task produced unparseable results", async_result=async_result)
+ else:
+ # If the async task finished, automatically cleanup the temporary
+ # status file left behind.
+ cleanup_task = Task.load(
+ {
+ 'async_status': {
+ 'jid': async_jid,
+ 'mode': 'cleanup',
+ },
+ 'environment': self._task.environment,
+ }
+ )
+ cleanup_handler = self._shared_loader_obj.action_loader.get(
+ 'ansible.legacy.async_status',
+ task=cleanup_task,
+ connection=self._connection,
+ play_context=self._play_context,
+ loader=self._loader,
+ templar=templar,
+ shared_loader_obj=self._shared_loader_obj,
+ )
+ cleanup_handler.run(task_vars=task_vars)
+ cleanup_handler.cleanup(force=True)
+ async_handler.cleanup(force=True)
+ return async_result
+
+ def _get_become(self, name):
+ become = become_loader.get(name)
+ if not become:
+ raise AnsibleError("Invalid become method specified, could not find matching plugin: '%s'. "
+ "Use `ansible-doc -t become -l` to list available plugins." % name)
+ return become
+
+ def _get_connection(self, cvars, templar, current_connection):
+ '''
+ Reads the connection property for the host, and returns the
+ correct connection object from the list of connection plugins
+ '''
+
+ self._play_context.connection = current_connection
+
+ # TODO: play context has logic to update the connection for 'smart'
+ # (default value, will chose between ssh and paramiko) and 'persistent'
+ # (really paramiko), eventually this should move to task object itself.
+ conn_type = self._play_context.connection
+
+ connection, plugin_load_context = self._shared_loader_obj.connection_loader.get_with_context(
+ conn_type,
+ self._play_context,
+ self._new_stdin,
+ task_uuid=self._task._uuid,
+ ansible_playbook_pid=to_text(os.getppid())
+ )
+
+ if not connection:
+ raise AnsibleError("the connection plugin '%s' was not found" % conn_type)
+
+ self._set_become_plugin(cvars, templar, connection)
+
+ # Also backwards compat call for those still using play_context
+ self._play_context.set_attributes_from_plugin(connection)
+
+ return connection
+
+ def _set_become_plugin(self, cvars, templar, connection):
+ # load become plugin if needed
+ if cvars.get('ansible_become') is not None:
+ become = boolean(templar.template(cvars['ansible_become']))
+ else:
+ become = self._task.become
+
+ if become:
+ if cvars.get('ansible_become_method'):
+ become_plugin = self._get_become(templar.template(cvars['ansible_become_method']))
+ else:
+ become_plugin = self._get_become(self._task.become_method)
+
+ else:
+ # If become is not enabled on the task it needs to be removed from the connection plugin
+ # https://github.com/ansible/ansible/issues/78425
+ become_plugin = None
+
+ try:
+ connection.set_become_plugin(become_plugin)
+ except AttributeError:
+ # Older connection plugin that does not support set_become_plugin
+ pass
+
+ if become_plugin:
+ if getattr(connection.become, 'require_tty', False) and not getattr(connection, 'has_tty', False):
+ raise AnsibleError(
+ "The '%s' connection does not provide a TTY which is required for the selected "
+ "become plugin: %s." % (connection._load_name, become_plugin.name)
+ )
+
+ # Backwards compat for connection plugins that don't support become plugins
+ # Just do this unconditionally for now, we could move it inside of the
+ # AttributeError above later
+ self._play_context.set_become_plugin(become_plugin.name)
+
+ def _set_plugin_options(self, plugin_type, variables, templar, task_keys):
+ try:
+ plugin = getattr(self._connection, '_%s' % plugin_type)
+ except AttributeError:
+ # Some plugins are assigned to private attrs, ``become`` is not
+ plugin = getattr(self._connection, plugin_type)
+
+ # network_cli's "real" connection plugin is not named connection
+ # to avoid the confusion of having connection.connection
+ if plugin_type == "ssh_type_conn":
+ plugin_type = "connection"
+ option_vars = C.config.get_plugin_vars(plugin_type, plugin._load_name)
+ options = {}
+ for k in option_vars:
+ if k in variables:
+ options[k] = templar.template(variables[k])
+ # TODO move to task method?
+ plugin.set_options(task_keys=task_keys, var_options=options)
+
+ return option_vars
+
+ def _set_connection_options(self, variables, templar):
+
+ # keep list of variable names possibly consumed
+ varnames = []
+
+ # grab list of usable vars for this plugin
+ option_vars = C.config.get_plugin_vars('connection', self._connection._load_name)
+ varnames.extend(option_vars)
+
+ # create dict of 'templated vars'
+ options = {'_extras': {}}
+ for k in option_vars:
+ if k in variables:
+ options[k] = templar.template(variables[k])
+
+ # add extras if plugin supports them
+ if getattr(self._connection, 'allow_extras', False):
+ for k in variables:
+ if k.startswith('ansible_%s_' % self._connection._load_name) and k not in options:
+ options['_extras'][k] = templar.template(variables[k])
+
+ task_keys = self._task.dump_attrs()
+
+ # The task_keys 'timeout' attr is the task's timeout, not the connection timeout.
+ # The connection timeout is threaded through the play_context for now.
+ task_keys['timeout'] = self._play_context.timeout
+
+ if self._play_context.password:
+ # The connection password is threaded through the play_context for
+ # now. This is something we ultimately want to avoid, but the first
+ # step is to get connection plugins pulling the password through the
+ # config system instead of directly accessing play_context.
+ task_keys['password'] = self._play_context.password
+
+ # Prevent task retries from overriding connection retries
+ del task_keys['retries']
+
+ # set options with 'templated vars' specific to this plugin and dependent ones
+ self._connection.set_options(task_keys=task_keys, var_options=options)
+ varnames.extend(self._set_plugin_options('shell', variables, templar, task_keys))
+
+ if self._connection.become is not None:
+ if self._play_context.become_pass:
+ # FIXME: eventually remove from task and play_context, here for backwards compat
+ # keep out of play objects to avoid accidental disclosure, only become plugin should have
+ # The become pass is already in the play_context if given on
+ # the CLI (-K). Make the plugin aware of it in this case.
+ task_keys['become_pass'] = self._play_context.become_pass
+
+ varnames.extend(self._set_plugin_options('become', variables, templar, task_keys))
+
+ # FOR BACKWARDS COMPAT:
+ for option in ('become_user', 'become_flags', 'become_exe', 'become_pass'):
+ try:
+ setattr(self._play_context, option, self._connection.become.get_option(option))
+ except KeyError:
+ pass # some plugins don't support all base flags
+ self._play_context.prompt = self._connection.become.prompt
+
+ # deals with networking sub_plugins (network_cli/httpapi/netconf)
+ sub = getattr(self._connection, '_sub_plugin', None)
+ if sub is not None and sub.get('type') != 'external':
+ plugin_type = get_plugin_class(sub.get("obj"))
+ varnames.extend(self._set_plugin_options(plugin_type, variables, templar, task_keys))
+ sub_conn = getattr(self._connection, 'ssh_type_conn', None)
+ if sub_conn is not None:
+ varnames.extend(self._set_plugin_options("ssh_type_conn", variables, templar, task_keys))
+
+ return varnames
+
+ def _get_action_handler(self, connection, templar):
+ '''
+ Returns the correct action plugin to handle the requestion task action
+ '''
+ return self._get_action_handler_with_module_context(connection, templar)[0]
+
+ def _get_action_handler_with_module_context(self, connection, templar):
+ '''
+ Returns the correct action plugin to handle the requestion task action and the module context
+ '''
+ module_collection, separator, module_name = self._task.action.rpartition(".")
+ module_prefix = module_name.split('_')[0]
+ if module_collection:
+ # For network modules, which look for one action plugin per platform, look for the
+ # action plugin in the same collection as the module by prefixing the action plugin
+ # with the same collection.
+ network_action = "{0}.{1}".format(module_collection, module_prefix)
+ else:
+ network_action = module_prefix
+
+ collections = self._task.collections
+
+ # Check if the module has specified an action handler
+ module = self._shared_loader_obj.module_loader.find_plugin_with_context(
+ self._task.action, collection_list=collections
+ )
+ if not module.resolved or not module.action_plugin:
+ module = None
+ if module is not None:
+ handler_name = module.action_plugin
+ # let action plugin override module, fallback to 'normal' action plugin otherwise
+ elif self._shared_loader_obj.action_loader.has_plugin(self._task.action, collection_list=collections):
+ handler_name = self._task.action
+ elif all((module_prefix in C.NETWORK_GROUP_MODULES, self._shared_loader_obj.action_loader.has_plugin(network_action, collection_list=collections))):
+ handler_name = network_action
+ display.vvvv("Using network group action {handler} for {action}".format(handler=handler_name,
+ action=self._task.action),
+ host=self._play_context.remote_addr)
+ else:
+ # use ansible.legacy.normal to allow (historic) local action_plugins/ override without collections search
+ handler_name = 'ansible.legacy.normal'
+ collections = None # until then, we don't want the task's collection list to be consulted; use the builtin
+
+ handler = self._shared_loader_obj.action_loader.get(
+ handler_name,
+ task=self._task,
+ connection=connection,
+ play_context=self._play_context,
+ loader=self._loader,
+ templar=templar,
+ shared_loader_obj=self._shared_loader_obj,
+ collection_list=collections
+ )
+
+ if not handler:
+ raise AnsibleError("the handler '%s' was not found" % handler_name)
+
+ return handler, module
+
+
+def start_connection(play_context, options, task_uuid):
+ '''
+ Starts the persistent connection
+ '''
+ candidate_paths = [C.ANSIBLE_CONNECTION_PATH or os.path.dirname(sys.argv[0])]
+ candidate_paths.extend(os.environ.get('PATH', '').split(os.pathsep))
+ for dirname in candidate_paths:
+ ansible_connection = os.path.join(dirname, 'ansible-connection')
+ if os.path.isfile(ansible_connection):
+ display.vvvv("Found ansible-connection at path {0}".format(ansible_connection))
+ break
+ else:
+ raise AnsibleError("Unable to find location of 'ansible-connection'. "
+ "Please set or check the value of ANSIBLE_CONNECTION_PATH")
+
+ env = os.environ.copy()
+ env.update({
+ # HACK; most of these paths may change during the controller's lifetime
+ # (eg, due to late dynamic role includes, multi-playbook execution), without a way
+ # to invalidate/update, ansible-connection won't always see the same plugins the controller
+ # can.
+ 'ANSIBLE_BECOME_PLUGINS': become_loader.print_paths(),
+ 'ANSIBLE_CLICONF_PLUGINS': cliconf_loader.print_paths(),
+ 'ANSIBLE_COLLECTIONS_PATH': to_native(os.pathsep.join(AnsibleCollectionConfig.collection_paths)),
+ 'ANSIBLE_CONNECTION_PLUGINS': connection_loader.print_paths(),
+ 'ANSIBLE_HTTPAPI_PLUGINS': httpapi_loader.print_paths(),
+ 'ANSIBLE_NETCONF_PLUGINS': netconf_loader.print_paths(),
+ 'ANSIBLE_TERMINAL_PLUGINS': terminal_loader.print_paths(),
+ })
+ verbosity = []
+ if display.verbosity:
+ verbosity.append('-%s' % ('v' * display.verbosity))
+ python = sys.executable
+ master, slave = pty.openpty()
+ p = subprocess.Popen(
+ [python, ansible_connection, *verbosity, to_text(os.getppid()), to_text(task_uuid)],
+ stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
+ )
+ os.close(slave)
+
+ # We need to set the pty into noncanonical mode. This ensures that we
+ # can receive lines longer than 4095 characters (plus newline) without
+ # truncating.
+ old = termios.tcgetattr(master)
+ new = termios.tcgetattr(master)
+ new[3] = new[3] & ~termios.ICANON
+
+ try:
+ termios.tcsetattr(master, termios.TCSANOW, new)
+ write_to_file_descriptor(master, options)
+ write_to_file_descriptor(master, play_context.serialize())
+
+ (stdout, stderr) = p.communicate()
+ finally:
+ termios.tcsetattr(master, termios.TCSANOW, old)
+ os.close(master)
+
+ if p.returncode == 0:
+ result = json.loads(to_text(stdout, errors='surrogate_then_replace'))
+ else:
+ try:
+ result = json.loads(to_text(stderr, errors='surrogate_then_replace'))
+ except getattr(json.decoder, 'JSONDecodeError', ValueError):
+ # JSONDecodeError only available on Python 3.5+
+ result = {'error': to_text(stderr, errors='surrogate_then_replace')}
+
+ if 'messages' in result:
+ for level, message in result['messages']:
+ if level == 'log':
+ display.display(message, log_only=True)
+ elif level in ('debug', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv'):
+ getattr(display, level)(message, host=play_context.remote_addr)
+ else:
+ if hasattr(display, level):
+ getattr(display, level)(message)
+ else:
+ display.vvvv(message, host=play_context.remote_addr)
+
+ if 'error' in result:
+ if display.verbosity > 2:
+ if result.get('exception'):
+ msg = "The full traceback is:\n" + result['exception']
+ display.display(msg, color=C.COLOR_ERROR)
+ raise AnsibleError(result['error'])
+
+ return result['socket_path']