diff options
Diffstat (limited to 'lib/ansible/plugins/strategy/__init__.py')
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 1202 |
1 files changed, 1202 insertions, 0 deletions
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py new file mode 100644 index 0000000..5cc05ee --- /dev/null +++ b/lib/ansible/plugins/strategy/__init__.py @@ -0,0 +1,1202 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import cmd +import functools +import os +import pprint +import queue +import sys +import threading +import time + +from collections import deque +from multiprocessing import Lock + +from jinja2.exceptions import UndefinedError + +from ansible import constants as C +from ansible import context +from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleUndefinedVariable, AnsibleParserError +from ansible.executor import action_write_locks +from ansible.executor.play_iterator import IteratingStates +from ansible.executor.process.worker import WorkerProcess +from ansible.executor.task_result import TaskResult +from ansible.executor.task_queue_manager import CallbackSend, DisplaySend +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_text +from ansible.module_utils.connection import Connection, ConnectionError +from ansible.playbook.conditional import Conditional +from ansible.playbook.handler import Handler +from ansible.playbook.helpers import load_list_of_blocks +from ansible.playbook.task import Task +from ansible.playbook.task_include import TaskInclude +from ansible.plugins import loader as plugin_loader +from ansible.template import Templar +from ansible.utils.display import Display +from ansible.utils.fqcn import add_internal_fqcns +from ansible.utils.unsafe_proxy import wrap_var +from ansible.utils.vars import combine_vars, isidentifier +from ansible.vars.clean import strip_internal_keys, module_response_deepcopy + +display = Display() + +__all__ = ['StrategyBase'] + +# This list can be an exact match, or start of string bound +# does not accept regex +ALWAYS_DELEGATE_FACT_PREFIXES = frozenset(( + 'discovered_interpreter_', +)) + + +class StrategySentinel: + pass + + +_sentinel = StrategySentinel() + + +def post_process_whens(result, task, templar, task_vars): + cond = None + if task.changed_when: + with templar.set_temporary_context(available_variables=task_vars): + cond = Conditional(loader=templar._loader) + cond.when = task.changed_when + result['changed'] = cond.evaluate_conditional(templar, templar.available_variables) + + if task.failed_when: + with templar.set_temporary_context(available_variables=task_vars): + if cond is None: + cond = Conditional(loader=templar._loader) + cond.when = task.failed_when + failed_when_result = cond.evaluate_conditional(templar, templar.available_variables) + result['failed_when_result'] = result['failed'] = failed_when_result + + +def _get_item_vars(result, task): + item_vars = {} + if task.loop or task.loop_with: + loop_var = result.get('ansible_loop_var', 'item') + index_var = result.get('ansible_index_var') + if loop_var in result: + item_vars[loop_var] = result[loop_var] + if index_var and index_var in result: + item_vars[index_var] = result[index_var] + if '_ansible_item_label' in result: + item_vars['_ansible_item_label'] = result['_ansible_item_label'] + if 'ansible_loop' in result: + item_vars['ansible_loop'] = result['ansible_loop'] + return item_vars + + +def results_thread_main(strategy): + while True: + try: + result = strategy._final_q.get() + if isinstance(result, StrategySentinel): + break + elif isinstance(result, DisplaySend): + display.display(*result.args, **result.kwargs) + elif isinstance(result, CallbackSend): + for arg in result.args: + if isinstance(arg, TaskResult): + strategy.normalize_task_result(arg) + break + strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs) + elif isinstance(result, TaskResult): + strategy.normalize_task_result(result) + with strategy._results_lock: + strategy._results.append(result) + else: + display.warning('Received an invalid object (%s) in the result queue: %r' % (type(result), result)) + except (IOError, EOFError): + break + except queue.Empty: + pass + + +def debug_closure(func): + """Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger""" + @functools.wraps(func) + def inner(self, iterator, one_pass=False, max_passes=None): + status_to_stats_map = ( + ('is_failed', 'failures'), + ('is_unreachable', 'dark'), + ('is_changed', 'changed'), + ('is_skipped', 'skipped'), + ) + + # We don't know the host yet, copy the previous states, for lookup after we process new results + prev_host_states = iterator.host_states.copy() + + results = func(self, iterator, one_pass=one_pass, max_passes=max_passes) + _processed_results = [] + + for result in results: + task = result._task + host = result._host + _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None) + task_vars = _queued_task_args['task_vars'] + play_context = _queued_task_args['play_context'] + # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state + try: + prev_host_state = prev_host_states[host.name] + except KeyError: + prev_host_state = iterator.get_host_state(host) + + while result.needs_debugger(globally_enabled=self.debugger_active): + next_action = NextAction() + dbg = Debugger(task, host, task_vars, play_context, result, next_action) + dbg.cmdloop() + + if next_action.result == NextAction.REDO: + # rollback host state + self._tqm.clear_failed_hosts() + if task.run_once and iterator._play.strategy in add_internal_fqcns(('linear',)) and result.is_failed(): + for host_name, state in prev_host_states.items(): + if host_name == host.name: + continue + iterator.set_state_for_host(host_name, state) + iterator._play._removed_hosts.remove(host_name) + iterator.set_state_for_host(host.name, prev_host_state) + for method, what in status_to_stats_map: + if getattr(result, method)(): + self._tqm._stats.decrement(what, host.name) + self._tqm._stats.decrement('ok', host.name) + + # redo + self._queue_task(host, task, task_vars, play_context) + + _processed_results.extend(debug_closure(func)(self, iterator, one_pass)) + break + elif next_action.result == NextAction.CONTINUE: + _processed_results.append(result) + break + elif next_action.result == NextAction.EXIT: + # Matches KeyboardInterrupt from bin/ansible + sys.exit(99) + else: + _processed_results.append(result) + + return _processed_results + return inner + + +class StrategyBase: + + ''' + This is the base class for strategy plugins, which contains some common + code useful to all strategies like running handlers, cleanup actions, etc. + ''' + + # by default, strategies should support throttling but we allow individual + # strategies to disable this and either forego supporting it or managing + # the throttling internally (as `free` does) + ALLOW_BASE_THROTTLING = True + + def __init__(self, tqm): + self._tqm = tqm + self._inventory = tqm.get_inventory() + self._workers = tqm._workers + self._variable_manager = tqm.get_variable_manager() + self._loader = tqm.get_loader() + self._final_q = tqm._final_q + self._step = context.CLIARGS.get('step', False) + self._diff = context.CLIARGS.get('diff', False) + + # the task cache is a dictionary of tuples of (host.name, task._uuid) + # used to find the original task object of in-flight tasks and to store + # the task args/vars and play context info used to queue the task. + self._queued_task_cache = {} + + # Backwards compat: self._display isn't really needed, just import the global display and use that. + self._display = display + + # internal counters + self._pending_results = 0 + self._cur_worker = 0 + + # this dictionary is used to keep track of hosts that have + # outstanding tasks still in queue + self._blocked_hosts = dict() + + self._results = deque() + self._results_lock = threading.Condition(threading.Lock()) + + # create the result processing thread for reading results in the background + self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) + self._results_thread.daemon = True + self._results_thread.start() + + # holds the list of active (persistent) connections to be shutdown at + # play completion + self._active_connections = dict() + + # Caches for get_host calls, to avoid calling excessively + # These values should be set at the top of the ``run`` method of each + # strategy plugin. Use ``_set_hosts_cache`` to set these values + self._hosts_cache = [] + self._hosts_cache_all = [] + + self.debugger_active = C.ENABLE_TASK_DEBUGGER + + def _set_hosts_cache(self, play, refresh=True): + """Responsible for setting _hosts_cache and _hosts_cache_all + + See comment in ``__init__`` for the purpose of these caches + """ + if not refresh and all((self._hosts_cache, self._hosts_cache_all)): + return + + if not play.finalized and Templar(None).is_template(play.hosts): + _pattern = 'all' + else: + _pattern = play.hosts or 'all' + self._hosts_cache_all = [h.name for h in self._inventory.get_hosts(pattern=_pattern, ignore_restrictions=True)] + self._hosts_cache = [h.name for h in self._inventory.get_hosts(play.hosts, order=play.order)] + + def cleanup(self): + # close active persistent connections + for sock in self._active_connections.values(): + try: + conn = Connection(sock) + conn.reset() + except ConnectionError as e: + # most likely socket is already closed + display.debug("got an error while closing persistent connection: %s" % e) + self._final_q.put(_sentinel) + self._results_thread.join() + + def run(self, iterator, play_context, result=0): + # execute one more pass through the iterator without peeking, to + # make sure that all of the hosts are advanced to their final task. + # This should be safe, as everything should be IteratingStates.COMPLETE by + # this point, though the strategy may not advance the hosts itself. + + for host in self._hosts_cache: + if host not in self._tqm._unreachable_hosts: + try: + iterator.get_next_task_for_host(self._inventory.hosts[host]) + except KeyError: + iterator.get_next_task_for_host(self._inventory.get_host(host)) + + # return the appropriate code, depending on the status hosts after the run + if not isinstance(result, bool) and result != self._tqm.RUN_OK: + return result + elif len(self._tqm._unreachable_hosts.keys()) > 0: + return self._tqm.RUN_UNREACHABLE_HOSTS + elif len(iterator.get_failed_hosts()) > 0: + return self._tqm.RUN_FAILED_HOSTS + else: + return self._tqm.RUN_OK + + def get_hosts_remaining(self, play): + self._set_hosts_cache(play, refresh=False) + ignore = set(self._tqm._failed_hosts).union(self._tqm._unreachable_hosts) + return [host for host in self._hosts_cache if host not in ignore] + + def get_failed_hosts(self, play): + self._set_hosts_cache(play, refresh=False) + return [host for host in self._hosts_cache if host in self._tqm._failed_hosts] + + def add_tqm_variables(self, vars, play): + ''' + Base class method to add extra variables/information to the list of task + vars sent through the executor engine regarding the task queue manager state. + ''' + vars['ansible_current_hosts'] = self.get_hosts_remaining(play) + vars['ansible_failed_hosts'] = self.get_failed_hosts(play) + + def _queue_task(self, host, task, task_vars, play_context): + ''' handles queueing the task up to be sent to a worker ''' + + display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) + + # Add a write lock for tasks. + # Maybe this should be added somewhere further up the call stack but + # this is the earliest in the code where we have task (1) extracted + # into its own variable and (2) there's only a single code path + # leading to the module being run. This is called by two + # functions: linear.py::run(), and + # free.py::run() so we'd have to add to both to do it there. + # The next common higher level is __init__.py::run() and that has + # tasks inside of play_iterator so we'd have to extract them to do it + # there. + + if task.action not in action_write_locks.action_write_locks: + display.debug('Creating lock for %s' % task.action) + action_write_locks.action_write_locks[task.action] = Lock() + + # create a templar and template things we need later for the queuing process + templar = Templar(loader=self._loader, variables=task_vars) + + try: + throttle = int(templar.template(task.throttle)) + except Exception as e: + raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) + + # and then queue the new task + try: + # Determine the "rewind point" of the worker list. This means we start + # iterating over the list of workers until the end of the list is found. + # Normally, that is simply the length of the workers list (as determined + # by the forks or serial setting), however a task/block/play may "throttle" + # that limit down. + rewind_point = len(self._workers) + if throttle > 0 and self.ALLOW_BASE_THROTTLING: + if task.run_once: + display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name()) + else: + if throttle <= rewind_point: + display.debug("task: %s, throttle: %d" % (task.get_name(), throttle)) + rewind_point = throttle + + queued = False + starting_worker = self._cur_worker + while True: + if self._cur_worker >= rewind_point: + self._cur_worker = 0 + + worker_prc = self._workers[self._cur_worker] + if worker_prc is None or not worker_prc.is_alive(): + self._queued_task_cache[(host.name, task._uuid)] = { + 'host': host, + 'task': task, + 'task_vars': task_vars, + 'play_context': play_context + } + + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader) + self._workers[self._cur_worker] = worker_prc + self._tqm.send_callback('v2_runner_on_start', host, task) + worker_prc.start() + display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) + queued = True + + self._cur_worker += 1 + + if self._cur_worker >= rewind_point: + self._cur_worker = 0 + + if queued: + break + elif self._cur_worker == starting_worker: + time.sleep(0.0001) + + self._pending_results += 1 + except (EOFError, IOError, AssertionError) as e: + # most likely an abort + display.debug("got an error while queuing: %s" % e) + return + display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) + + def get_task_hosts(self, iterator, task_host, task): + if task.run_once: + host_list = [host for host in self._hosts_cache if host not in self._tqm._unreachable_hosts] + else: + host_list = [task_host.name] + return host_list + + def get_delegated_hosts(self, result, task): + host_name = result.get('_ansible_delegated_vars', {}).get('ansible_delegated_host', None) + return [host_name or task.delegate_to] + + def _set_always_delegated_facts(self, result, task): + """Sets host facts for ``delegate_to`` hosts for facts that should + always be delegated + + This operation mutates ``result`` to remove the always delegated facts + + See ``ALWAYS_DELEGATE_FACT_PREFIXES`` + """ + if task.delegate_to is None: + return + + facts = result['ansible_facts'] + always_keys = set() + _add = always_keys.add + for fact_key in facts: + for always_key in ALWAYS_DELEGATE_FACT_PREFIXES: + if fact_key.startswith(always_key): + _add(fact_key) + if always_keys: + _pop = facts.pop + always_facts = { + 'ansible_facts': dict((k, _pop(k)) for k in list(facts) if k in always_keys) + } + host_list = self.get_delegated_hosts(result, task) + _set_host_facts = self._variable_manager.set_host_facts + for target_host in host_list: + _set_host_facts(target_host, always_facts) + + def normalize_task_result(self, task_result): + """Normalize a TaskResult to reference actual Host and Task objects + when only given the ``Host.name``, or the ``Task._uuid`` + + Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from + the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns + + Mutates the original object + """ + + if isinstance(task_result._host, string_types): + # If the value is a string, it is ``Host.name`` + task_result._host = self._inventory.get_host(to_text(task_result._host)) + + if isinstance(task_result._task, string_types): + # If the value is a string, it is ``Task._uuid`` + queue_cache_entry = (task_result._host.name, task_result._task) + try: + found_task = self._queued_task_cache[queue_cache_entry]['task'] + except KeyError: + # This should only happen due to an implicit task created by the + # TaskExecutor, restrict this behavior to the explicit use case + # of an implicit async_status task + if task_result._task_fields.get('action') != 'async_status': + raise + original_task = Task() + else: + original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) + original_task._parent = found_task._parent + original_task.from_attrs(task_result._task_fields) + task_result._task = original_task + + return task_result + + @debug_closure + def _process_pending_results(self, iterator, one_pass=False, max_passes=None): + ''' + Reads results off the final queue and takes appropriate action + based on the result (executing callbacks, updating state, etc.). + ''' + + ret_results = [] + handler_templar = Templar(self._loader) + + def search_handler_blocks_by_name(handler_name, handler_blocks): + # iterate in reversed order since last handler loaded with the same name wins + for handler_block in reversed(handler_blocks): + for handler_task in handler_block.block: + if handler_task.name: + try: + if not handler_task.cached_name: + if handler_templar.is_template(handler_task.name): + handler_templar.available_variables = self._variable_manager.get_vars(play=iterator._play, + task=handler_task, + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all) + handler_task.name = handler_templar.template(handler_task.name) + handler_task.cached_name = True + + # first we check with the full result of get_name(), which may + # include the role name (if the handler is from a role). If that + # is not found, we resort to the simple name field, which doesn't + # have anything extra added to it. + candidates = ( + handler_task.name, + handler_task.get_name(include_role_fqcn=False), + handler_task.get_name(include_role_fqcn=True), + ) + + if handler_name in candidates: + return handler_task + except (UndefinedError, AnsibleUndefinedVariable) as e: + # We skip this handler due to the fact that it may be using + # a variable in the name that was conditionally included via + # set_fact or some other method, and we don't want to error + # out unnecessarily + if not handler_task.listen: + display.warning( + "Handler '%s' is unusable because it has no listen topics and " + "the name could not be templated (host-specific variables are " + "not supported in handler names). The error: %s" % (handler_task.name, to_text(e)) + ) + continue + + cur_pass = 0 + while True: + try: + self._results_lock.acquire() + task_result = self._results.popleft() + except IndexError: + break + finally: + self._results_lock.release() + + original_host = task_result._host + original_task = task_result._task + + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + ignore_errors = original_task.ignore_errors + if not ignore_errors: + # save the current state before failing it for later inspection + state_when_failed = iterator.get_state_for_host(original_host.name) + display.debug("marking %s as failed" % original_host.name) + if original_task.run_once: + # if we're using run_once, we have to fail every host here + for h in self._inventory.get_hosts(iterator._play.hosts): + if h.name not in self._tqm._unreachable_hosts: + iterator.mark_host_failed(h) + else: + iterator.mark_host_failed(original_host) + + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + + if iterator.is_failed(original_host) and state and state.run_state == IteratingStates.COMPLETE: + self._tqm._failed_hosts[original_host.name] = True + + # if we're iterating on the rescue portion of a block then + # we save the failed task in a special var for use + # within the rescue/always + if iterator.is_any_block_rescuing(state_when_failed): + self._tqm._stats.increment('rescued', original_host.name) + iterator._play._removed_hosts.remove(original_host.name) + self._variable_manager.set_nonpersistent_facts( + original_host.name, + dict( + ansible_failed_task=wrap_var(original_task.serialize()), + ansible_failed_result=task_result._result, + ), + ) + else: + self._tqm._stats.increment('failures', original_host.name) + else: + self._tqm._stats.increment('ok', original_host.name) + self._tqm._stats.increment('ignored', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors) + elif task_result.is_unreachable(): + ignore_unreachable = original_task.ignore_unreachable + if not ignore_unreachable: + self._tqm._unreachable_hosts[original_host.name] = True + iterator._play._removed_hosts.append(original_host.name) + self._tqm._stats.increment('dark', original_host.name) + else: + self._tqm._stats.increment('ok', original_host.name) + self._tqm._stats.increment('ignored', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True + + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) + else: + result_items = [task_result._result] + + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + found = False + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers) + if target_handler is not None: + found = True + if target_handler.notify_host(original_host): + self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host) + + for listening_handler_block in iterator._play.handlers: + for listening_handler in listening_handler_block.block: + listeners = getattr(listening_handler, 'listen', []) or [] + if not listeners: + continue + + listeners = listening_handler.get_validated_value( + 'listen', listening_handler.fattributes.get('listen'), listeners, handler_templar + ) + if handler_name not in listeners: + continue + else: + found = True + + if listening_handler.notify_host(original_host): + self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host) + + # and if none were found, then we raise an error + if not found: + msg = ("The requested handler '%s' was not found in either the main handlers list nor in the listening " + "handlers list" % handler_name) + if C.ERROR_ON_MISSING_HANDLER: + raise AnsibleError(msg) + else: + display.warning(msg) + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._inventory.add_dynamic_host(new_host_info, result_item) + # ensure host is available for subsequent plays + if result_item.get('changed') and new_host_info['host_name'] not in self._hosts_cache_all: + self._hosts_cache_all.append(new_host_info['host_name']) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._inventory.add_dynamic_group(original_host, result_item) + + if 'add_host' in result_item or 'add_group' in result_item: + item_vars = _get_item_vars(result_item, original_task) + found_task_vars = self._queued_task_cache.get((original_host.name, task_result._task._uuid))['task_vars'] + if item_vars: + all_task_vars = combine_vars(found_task_vars, item_vars) + else: + all_task_vars = found_task_vars + all_task_vars[original_task.register] = wrap_var(result_item) + post_process_whens(result_item, original_task, handler_templar, all_task_vars) + if original_task.loop or original_task.loop_with: + new_item_result = TaskResult( + task_result._host, + task_result._task, + result_item, + task_result._task_fields, + ) + self._tqm.send_callback('v2_runner_item_on_ok', new_item_result) + if result_item.get('changed', False): + task_result._result['changed'] = True + if result_item.get('failed', False): + task_result._result['failed'] = True + + if 'ansible_facts' in result_item and original_task.action not in C._ACTION_DEBUG: + # if delegated fact and we are delegating facts, we need to change target host for them + if original_task.delegate_to is not None and original_task.delegate_facts: + host_list = self.get_delegated_hosts(result_item, original_task) + else: + # Set facts that should always be on the delegated hosts + self._set_always_delegated_facts(result_item, original_task) + + host_list = self.get_task_hosts(iterator, original_host, original_task) + + if original_task.action in C._ACTION_INCLUDE_VARS: + for (var_name, var_value) in result_item['ansible_facts'].items(): + # find the host we're actually referring too here, which may + # be a host that is not really in inventory at all + for target_host in host_list: + self._variable_manager.set_host_variable(target_host, var_name, var_value) + else: + cacheable = result_item.pop('_ansible_facts_cacheable', False) + for target_host in host_list: + # so set_fact is a misnomer but 'cacheable = true' was meant to create an 'actual fact' + # to avoid issues with precedence and confusion with set_fact normal operation, + # we set BOTH fact and nonpersistent_facts (aka hostvar) + # when fact is retrieved from cache in subsequent operations it will have the lower precedence, + # but for playbook setting it the 'higher' precedence is kept + is_set_fact = original_task.action in C._ACTION_SET_FACT + if not is_set_fact or cacheable: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + if is_set_fact: + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + + if 'ansible_stats' in result_item and 'data' in result_item['ansible_stats'] and result_item['ansible_stats']['data']: + + if 'per_host' not in result_item['ansible_stats'] or result_item['ansible_stats']['per_host']: + host_list = self.get_task_hosts(iterator, original_host, original_task) + else: + host_list = [None] + + data = result_item['ansible_stats']['data'] + aggregate = 'aggregate' in result_item['ansible_stats'] and result_item['ansible_stats']['aggregate'] + for myhost in host_list: + for k in data.keys(): + if aggregate: + self._tqm._stats.update_custom_stats(k, data[k], myhost) + else: + self._tqm._stats.set_custom_stats(k, data[k], myhost) + + if 'diff' in task_result._result: + if self._diff or getattr(original_task, 'diff', False): + self._tqm.send_callback('v2_on_file_diff', task_result) + + if not isinstance(original_task, TaskInclude): + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) + + # register final results + if original_task.register: + + if not isidentifier(original_task.register): + raise AnsibleError("Invalid variable name in 'register' specified: '%s'" % original_task.register) + + host_list = self.get_task_hosts(iterator, original_host, original_task) + + clean_copy = strip_internal_keys(module_response_deepcopy(task_result._result)) + if 'invocation' in clean_copy: + del clean_copy['invocation'] + + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) + + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] + + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: # TODO: and original_task.action not in C._ACTION_INCLUDE_ROLE:? + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iterator._play.ROLE_CACHE[original_task._role.get_name()].items(): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True + + ret_results.append(task_result) + + if isinstance(original_task, Handler): + for handler in (h for b in iterator._play.handlers for h in b.block if h._uuid == original_task._uuid): + handler.remove_host(original_host) + + if one_pass or max_passes is not None and (cur_pass + 1) >= max_passes: + break + + cur_pass += 1 + + return ret_results + + def _wait_on_pending_results(self, iterator): + ''' + Wait for the shared counter to drop to zero, using a short sleep + between checks to ensure we don't spin lock + ''' + + ret_results = [] + + display.debug("waiting for pending results...") + while self._pending_results > 0 and not self._tqm._terminated: + + if self._tqm.has_dead_workers(): + raise AnsibleError("A worker was found in a dead state") + + results = self._process_pending_results(iterator) + ret_results.extend(results) + if self._pending_results > 0: + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + + display.debug("no more pending results, returning what we have") + + return ret_results + + def _copy_included_file(self, included_file): + ''' + A proven safe and performant way to create a copy of an included file + ''' + ti_copy = included_file._task.copy(exclude_parent=True) + ti_copy._parent = included_file._task._parent + + temp_vars = ti_copy.vars | included_file._vars + + ti_copy.vars = temp_vars + + return ti_copy + + def _load_included_file(self, included_file, iterator, is_handler=False): + ''' + Loads an included YAML file of tasks, applying the optional set of variables. + + Raises AnsibleError exception in case of a failure during including a file, + in such case the caller is responsible for marking the host(s) as failed + using PlayIterator.mark_host_failed(). + ''' + display.debug("loading included file: %s" % included_file._filename) + try: + data = self._loader.load_from_file(included_file._filename) + if data is None: + return [] + elif not isinstance(data, list): + raise AnsibleError("included task files must contain a list of tasks") + + ti_copy = self._copy_included_file(included_file) + + block_list = load_list_of_blocks( + data, + play=iterator._play, + parent_block=ti_copy.build_parent_block(), + role=included_file._task._role, + use_handlers=is_handler, + loader=self._loader, + variable_manager=self._variable_manager, + ) + + # since we skip incrementing the stats when the task result is + # first processed, we do so now for each host in the list + for host in included_file._hosts: + self._tqm._stats.increment('ok', host.name) + except AnsibleParserError: + raise + except AnsibleError as e: + if isinstance(e, AnsibleFileNotFound): + reason = "Could not find or access '%s' on the Ansible Controller." % to_text(e.file_name) + else: + reason = to_text(e) + + for r in included_file._results: + r._result['failed'] = True + + for host in included_file._hosts: + tr = TaskResult(host=host, task=included_file._task, return_data=dict(failed=True, reason=reason)) + self._tqm._stats.increment('failures', host.name) + self._tqm.send_callback('v2_runner_on_failed', tr) + raise AnsibleError(reason) from e + + # finally, send the callback and return the list of blocks loaded + self._tqm.send_callback('v2_playbook_on_include', included_file) + display.debug("done processing included file") + return block_list + + def _take_step(self, task, host=None): + + ret = False + msg = u'Perform task: %s ' % task + if host: + msg += u'on %s ' % host + msg += u'(N)o/(y)es/(c)ontinue: ' + resp = display.prompt(msg) + + if resp.lower() in ['y', 'yes']: + display.debug("User ran task") + ret = True + elif resp.lower() in ['c', 'continue']: + display.debug("User ran task and canceled step mode") + self._step = False + ret = True + else: + display.debug("User skipped task") + + display.banner(msg) + + return ret + + def _cond_not_supported_warn(self, task_name): + display.warning("%s task does not support when conditional" % task_name) + + def _execute_meta(self, task, play_context, iterator, target_host): + + # meta tasks store their args in the _raw_params field of args, + # since they do not use k=v pairs, so get that + meta_action = task.args.get('_raw_params') + + def _evaluate_conditional(h): + all_vars = self._variable_manager.get_vars(play=iterator._play, host=h, task=task, + _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) + templar = Templar(loader=self._loader, variables=all_vars) + return task.evaluate_conditional(templar, all_vars) + + skipped = False + msg = meta_action + skip_reason = '%s conditional evaluated to False' % meta_action + if isinstance(task, Handler): + self._tqm.send_callback('v2_playbook_on_handler_task_start', task) + else: + self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) + + # These don't support "when" conditionals + if meta_action in ('noop', 'refresh_inventory', 'reset_connection') and task.when: + self._cond_not_supported_warn(meta_action) + + if meta_action == 'noop': + msg = "noop" + elif meta_action == 'flush_handlers': + if _evaluate_conditional(target_host): + host_state = iterator.get_state_for_host(target_host.name) + if host_state.run_state == IteratingStates.HANDLERS: + raise AnsibleError('flush_handlers cannot be used as a handler') + if target_host.name not in self._tqm._unreachable_hosts: + host_state.pre_flushing_run_state = host_state.run_state + host_state.run_state = IteratingStates.HANDLERS + msg = "triggered running handlers for %s" % target_host.name + else: + skipped = True + skip_reason += ', not running handlers for %s' % target_host.name + elif meta_action == 'refresh_inventory': + self._inventory.refresh_inventory() + self._set_hosts_cache(iterator._play) + msg = "inventory successfully refreshed" + elif meta_action == 'clear_facts': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + hostname = host.get_name() + self._variable_manager.clear_facts(hostname) + msg = "facts cleared" + else: + skipped = True + skip_reason += ', not clearing facts and fact cache for %s' % target_host.name + elif meta_action == 'clear_host_errors': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + self._tqm._failed_hosts.pop(host.name, False) + self._tqm._unreachable_hosts.pop(host.name, False) + iterator.clear_host_errors(host) + msg = "cleared host errors" + else: + skipped = True + skip_reason += ', not clearing host error state for %s' % target_host.name + elif meta_action == 'end_batch': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + if host.name not in self._tqm._unreachable_hosts: + iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) + msg = "ending batch" + else: + skipped = True + skip_reason += ', continuing current batch' + elif meta_action == 'end_play': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + if host.name not in self._tqm._unreachable_hosts: + iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) + # end_play is used in PlaybookExecutor/TQM to indicate that + # the whole play is supposed to be ended as opposed to just a batch + iterator.end_play = True + msg = "ending play" + else: + skipped = True + skip_reason += ', continuing play' + elif meta_action == 'end_host': + if _evaluate_conditional(target_host): + iterator.set_run_state_for_host(target_host.name, IteratingStates.COMPLETE) + iterator._play._removed_hosts.append(target_host.name) + msg = "ending play for %s" % target_host.name + else: + skipped = True + skip_reason += ", continuing execution for %s" % target_host.name + # TODO: Nix msg here? Left for historical reasons, but skip_reason exists now. + msg = "end_host conditional evaluated to false, continuing execution for %s" % target_host.name + elif meta_action == 'role_complete': + # Allow users to use this in a play as reported in https://github.com/ansible/ansible/issues/22286? + # How would this work with allow_duplicates?? + if task.implicit: + if target_host.name in task._role._had_task_run: + task._role._completed[target_host.name] = True + msg = 'role_complete for %s' % target_host.name + elif meta_action == 'reset_connection': + all_vars = self._variable_manager.get_vars(play=iterator._play, host=target_host, task=task, + _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) + templar = Templar(loader=self._loader, variables=all_vars) + + # apply the given task's information to the connection info, + # which may override some fields already set by the play or + # the options specified on the command line + play_context = play_context.set_task_and_variable_override(task=task, variables=all_vars, templar=templar) + + # fields set from the play/task may be based on variables, so we have to + # do the same kind of post validation step on it here before we use it. + play_context.post_validate(templar=templar) + + # now that the play context is finalized, if the remote_addr is not set + # default to using the host's address field as the remote address + if not play_context.remote_addr: + play_context.remote_addr = target_host.address + + # We also add "magic" variables back into the variables dict to make sure + # a certain subset of variables exist. This 'mostly' works here cause meta + # disregards the loop, but should not really use play_context at all + play_context.update_vars(all_vars) + + if target_host in self._active_connections: + connection = Connection(self._active_connections[target_host]) + del self._active_connections[target_host] + else: + connection = plugin_loader.connection_loader.get(play_context.connection, play_context, os.devnull) + connection.set_options(task_keys=task.dump_attrs(), var_options=all_vars) + play_context.set_attributes_from_plugin(connection) + + if connection: + try: + connection.reset() + msg = 'reset connection' + except ConnectionError as e: + # most likely socket is already closed + display.debug("got an error while closing persistent connection: %s" % e) + else: + msg = 'no connection, nothing to reset' + else: + raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds) + + result = {'msg': msg} + if skipped: + result['skipped'] = True + result['skip_reason'] = skip_reason + else: + result['changed'] = False + + if not task.implicit: + header = skip_reason if skipped else msg + display.vv(f"META: {header}") + + if isinstance(task, Handler): + task.remove_host(target_host) + + res = TaskResult(target_host, task, result) + if skipped: + self._tqm.send_callback('v2_runner_on_skipped', res) + return [res] + + def get_hosts_left(self, iterator): + ''' returns list of available hosts for this iterator by filtering out unreachables ''' + + hosts_left = [] + for host in self._hosts_cache: + if host not in self._tqm._unreachable_hosts: + try: + hosts_left.append(self._inventory.hosts[host]) + except KeyError: + hosts_left.append(self._inventory.get_host(host)) + return hosts_left + + def update_active_connections(self, results): + ''' updates the current active persistent connections ''' + for r in results: + if 'args' in r._task_fields: + socket_path = r._task_fields['args'].get('_ansible_socket') + if socket_path: + if r._host not in self._active_connections: + self._active_connections[r._host] = socket_path + + +class NextAction(object): + """ The next action after an interpreter's exit. """ + REDO = 1 + CONTINUE = 2 + EXIT = 3 + + def __init__(self, result=EXIT): + self.result = result + + +class Debugger(cmd.Cmd): + prompt_continuous = '> ' # multiple lines + + def __init__(self, task, host, task_vars, play_context, result, next_action): + # cmd.Cmd is old-style class + cmd.Cmd.__init__(self) + + self.prompt = '[%s] %s (debug)> ' % (host, task) + self.intro = None + self.scope = {} + self.scope['task'] = task + self.scope['task_vars'] = task_vars + self.scope['host'] = host + self.scope['play_context'] = play_context + self.scope['result'] = result + self.next_action = next_action + + def cmdloop(self): + try: + cmd.Cmd.cmdloop(self) + except KeyboardInterrupt: + pass + + do_h = cmd.Cmd.do_help + + def do_EOF(self, args): + """Quit""" + return self.do_quit(args) + + def do_quit(self, args): + """Quit""" + display.display('User interrupted execution') + self.next_action.result = NextAction.EXIT + return True + + do_q = do_quit + + def do_continue(self, args): + """Continue to next result""" + self.next_action.result = NextAction.CONTINUE + return True + + do_c = do_continue + + def do_redo(self, args): + """Schedule task for re-execution. The re-execution may not be the next result""" + self.next_action.result = NextAction.REDO + return True + + do_r = do_redo + + def do_update_task(self, args): + """Recreate the task from ``task._ds``, and template with updated ``task_vars``""" + templar = Templar(None, variables=self.scope['task_vars']) + task = self.scope['task'] + task = task.load_data(task._ds) + task.post_validate(templar) + self.scope['task'] = task + + do_u = do_update_task + + def evaluate(self, args): + try: + return eval(args, globals(), self.scope) + except Exception: + t, v = sys.exc_info()[:2] + if isinstance(t, str): + exc_type_name = t + else: + exc_type_name = t.__name__ + display.display('***%s:%s' % (exc_type_name, repr(v))) + raise + + def do_pprint(self, args): + """Pretty Print""" + try: + result = self.evaluate(args) + display.display(pprint.pformat(result)) + except Exception: + pass + + do_p = do_pprint + + def execute(self, args): + try: + code = compile(args + '\n', '<stdin>', 'single') + exec(code, globals(), self.scope) + except Exception: + t, v = sys.exc_info()[:2] + if isinstance(t, str): + exc_type_name = t + else: + exc_type_name = t.__name__ + display.display('***%s:%s' % (exc_type_name, repr(v))) + raise + + def default(self, line): + try: + self.execute(line) + except Exception: + pass |