diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/plugins/strategy | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream/2.14.3.tar.xz ansible-core-upstream/2.14.3.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/plugins/strategy')
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 1202 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/debug.py | 37 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/free.py | 303 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/host_pinned.py | 45 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/linear.py | 406 |
5 files changed, 1993 insertions, 0 deletions
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py new file mode 100644 index 0000000..5cc05ee --- /dev/null +++ b/lib/ansible/plugins/strategy/__init__.py @@ -0,0 +1,1202 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import cmd +import functools +import os +import pprint +import queue +import sys +import threading +import time + +from collections import deque +from multiprocessing import Lock + +from jinja2.exceptions import UndefinedError + +from ansible import constants as C +from ansible import context +from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleUndefinedVariable, AnsibleParserError +from ansible.executor import action_write_locks +from ansible.executor.play_iterator import IteratingStates +from ansible.executor.process.worker import WorkerProcess +from ansible.executor.task_result import TaskResult +from ansible.executor.task_queue_manager import CallbackSend, DisplaySend +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_text +from ansible.module_utils.connection import Connection, ConnectionError +from ansible.playbook.conditional import Conditional +from ansible.playbook.handler import Handler +from ansible.playbook.helpers import load_list_of_blocks +from ansible.playbook.task import Task +from ansible.playbook.task_include import TaskInclude +from ansible.plugins import loader as plugin_loader +from ansible.template import Templar +from ansible.utils.display import Display +from ansible.utils.fqcn import add_internal_fqcns +from ansible.utils.unsafe_proxy import wrap_var +from ansible.utils.vars import combine_vars, isidentifier +from ansible.vars.clean import strip_internal_keys, module_response_deepcopy + +display = Display() + +__all__ = ['StrategyBase'] + +# This list can be an exact match, or start of string bound +# does not accept regex +ALWAYS_DELEGATE_FACT_PREFIXES = frozenset(( + 'discovered_interpreter_', +)) + + +class StrategySentinel: + pass + + +_sentinel = StrategySentinel() + + +def post_process_whens(result, task, templar, task_vars): + cond = None + if task.changed_when: + with templar.set_temporary_context(available_variables=task_vars): + cond = Conditional(loader=templar._loader) + cond.when = task.changed_when + result['changed'] = cond.evaluate_conditional(templar, templar.available_variables) + + if task.failed_when: + with templar.set_temporary_context(available_variables=task_vars): + if cond is None: + cond = Conditional(loader=templar._loader) + cond.when = task.failed_when + failed_when_result = cond.evaluate_conditional(templar, templar.available_variables) + result['failed_when_result'] = result['failed'] = failed_when_result + + +def _get_item_vars(result, task): + item_vars = {} + if task.loop or task.loop_with: + loop_var = result.get('ansible_loop_var', 'item') + index_var = result.get('ansible_index_var') + if loop_var in result: + item_vars[loop_var] = result[loop_var] + if index_var and index_var in result: + item_vars[index_var] = result[index_var] + if '_ansible_item_label' in result: + item_vars['_ansible_item_label'] = result['_ansible_item_label'] + if 'ansible_loop' in result: + item_vars['ansible_loop'] = result['ansible_loop'] + return item_vars + + +def results_thread_main(strategy): + while True: + try: + result = strategy._final_q.get() + if isinstance(result, StrategySentinel): + break + elif isinstance(result, DisplaySend): + display.display(*result.args, **result.kwargs) + elif isinstance(result, CallbackSend): + for arg in result.args: + if isinstance(arg, TaskResult): + strategy.normalize_task_result(arg) + break + strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs) + elif isinstance(result, TaskResult): + strategy.normalize_task_result(result) + with strategy._results_lock: + strategy._results.append(result) + else: + display.warning('Received an invalid object (%s) in the result queue: %r' % (type(result), result)) + except (IOError, EOFError): + break + except queue.Empty: + pass + + +def debug_closure(func): + """Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger""" + @functools.wraps(func) + def inner(self, iterator, one_pass=False, max_passes=None): + status_to_stats_map = ( + ('is_failed', 'failures'), + ('is_unreachable', 'dark'), + ('is_changed', 'changed'), + ('is_skipped', 'skipped'), + ) + + # We don't know the host yet, copy the previous states, for lookup after we process new results + prev_host_states = iterator.host_states.copy() + + results = func(self, iterator, one_pass=one_pass, max_passes=max_passes) + _processed_results = [] + + for result in results: + task = result._task + host = result._host + _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None) + task_vars = _queued_task_args['task_vars'] + play_context = _queued_task_args['play_context'] + # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state + try: + prev_host_state = prev_host_states[host.name] + except KeyError: + prev_host_state = iterator.get_host_state(host) + + while result.needs_debugger(globally_enabled=self.debugger_active): + next_action = NextAction() + dbg = Debugger(task, host, task_vars, play_context, result, next_action) + dbg.cmdloop() + + if next_action.result == NextAction.REDO: + # rollback host state + self._tqm.clear_failed_hosts() + if task.run_once and iterator._play.strategy in add_internal_fqcns(('linear',)) and result.is_failed(): + for host_name, state in prev_host_states.items(): + if host_name == host.name: + continue + iterator.set_state_for_host(host_name, state) + iterator._play._removed_hosts.remove(host_name) + iterator.set_state_for_host(host.name, prev_host_state) + for method, what in status_to_stats_map: + if getattr(result, method)(): + self._tqm._stats.decrement(what, host.name) + self._tqm._stats.decrement('ok', host.name) + + # redo + self._queue_task(host, task, task_vars, play_context) + + _processed_results.extend(debug_closure(func)(self, iterator, one_pass)) + break + elif next_action.result == NextAction.CONTINUE: + _processed_results.append(result) + break + elif next_action.result == NextAction.EXIT: + # Matches KeyboardInterrupt from bin/ansible + sys.exit(99) + else: + _processed_results.append(result) + + return _processed_results + return inner + + +class StrategyBase: + + ''' + This is the base class for strategy plugins, which contains some common + code useful to all strategies like running handlers, cleanup actions, etc. + ''' + + # by default, strategies should support throttling but we allow individual + # strategies to disable this and either forego supporting it or managing + # the throttling internally (as `free` does) + ALLOW_BASE_THROTTLING = True + + def __init__(self, tqm): + self._tqm = tqm + self._inventory = tqm.get_inventory() + self._workers = tqm._workers + self._variable_manager = tqm.get_variable_manager() + self._loader = tqm.get_loader() + self._final_q = tqm._final_q + self._step = context.CLIARGS.get('step', False) + self._diff = context.CLIARGS.get('diff', False) + + # the task cache is a dictionary of tuples of (host.name, task._uuid) + # used to find the original task object of in-flight tasks and to store + # the task args/vars and play context info used to queue the task. + self._queued_task_cache = {} + + # Backwards compat: self._display isn't really needed, just import the global display and use that. + self._display = display + + # internal counters + self._pending_results = 0 + self._cur_worker = 0 + + # this dictionary is used to keep track of hosts that have + # outstanding tasks still in queue + self._blocked_hosts = dict() + + self._results = deque() + self._results_lock = threading.Condition(threading.Lock()) + + # create the result processing thread for reading results in the background + self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) + self._results_thread.daemon = True + self._results_thread.start() + + # holds the list of active (persistent) connections to be shutdown at + # play completion + self._active_connections = dict() + + # Caches for get_host calls, to avoid calling excessively + # These values should be set at the top of the ``run`` method of each + # strategy plugin. Use ``_set_hosts_cache`` to set these values + self._hosts_cache = [] + self._hosts_cache_all = [] + + self.debugger_active = C.ENABLE_TASK_DEBUGGER + + def _set_hosts_cache(self, play, refresh=True): + """Responsible for setting _hosts_cache and _hosts_cache_all + + See comment in ``__init__`` for the purpose of these caches + """ + if not refresh and all((self._hosts_cache, self._hosts_cache_all)): + return + + if not play.finalized and Templar(None).is_template(play.hosts): + _pattern = 'all' + else: + _pattern = play.hosts or 'all' + self._hosts_cache_all = [h.name for h in self._inventory.get_hosts(pattern=_pattern, ignore_restrictions=True)] + self._hosts_cache = [h.name for h in self._inventory.get_hosts(play.hosts, order=play.order)] + + def cleanup(self): + # close active persistent connections + for sock in self._active_connections.values(): + try: + conn = Connection(sock) + conn.reset() + except ConnectionError as e: + # most likely socket is already closed + display.debug("got an error while closing persistent connection: %s" % e) + self._final_q.put(_sentinel) + self._results_thread.join() + + def run(self, iterator, play_context, result=0): + # execute one more pass through the iterator without peeking, to + # make sure that all of the hosts are advanced to their final task. + # This should be safe, as everything should be IteratingStates.COMPLETE by + # this point, though the strategy may not advance the hosts itself. + + for host in self._hosts_cache: + if host not in self._tqm._unreachable_hosts: + try: + iterator.get_next_task_for_host(self._inventory.hosts[host]) + except KeyError: + iterator.get_next_task_for_host(self._inventory.get_host(host)) + + # return the appropriate code, depending on the status hosts after the run + if not isinstance(result, bool) and result != self._tqm.RUN_OK: + return result + elif len(self._tqm._unreachable_hosts.keys()) > 0: + return self._tqm.RUN_UNREACHABLE_HOSTS + elif len(iterator.get_failed_hosts()) > 0: + return self._tqm.RUN_FAILED_HOSTS + else: + return self._tqm.RUN_OK + + def get_hosts_remaining(self, play): + self._set_hosts_cache(play, refresh=False) + ignore = set(self._tqm._failed_hosts).union(self._tqm._unreachable_hosts) + return [host for host in self._hosts_cache if host not in ignore] + + def get_failed_hosts(self, play): + self._set_hosts_cache(play, refresh=False) + return [host for host in self._hosts_cache if host in self._tqm._failed_hosts] + + def add_tqm_variables(self, vars, play): + ''' + Base class method to add extra variables/information to the list of task + vars sent through the executor engine regarding the task queue manager state. + ''' + vars['ansible_current_hosts'] = self.get_hosts_remaining(play) + vars['ansible_failed_hosts'] = self.get_failed_hosts(play) + + def _queue_task(self, host, task, task_vars, play_context): + ''' handles queueing the task up to be sent to a worker ''' + + display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) + + # Add a write lock for tasks. + # Maybe this should be added somewhere further up the call stack but + # this is the earliest in the code where we have task (1) extracted + # into its own variable and (2) there's only a single code path + # leading to the module being run. This is called by two + # functions: linear.py::run(), and + # free.py::run() so we'd have to add to both to do it there. + # The next common higher level is __init__.py::run() and that has + # tasks inside of play_iterator so we'd have to extract them to do it + # there. + + if task.action not in action_write_locks.action_write_locks: + display.debug('Creating lock for %s' % task.action) + action_write_locks.action_write_locks[task.action] = Lock() + + # create a templar and template things we need later for the queuing process + templar = Templar(loader=self._loader, variables=task_vars) + + try: + throttle = int(templar.template(task.throttle)) + except Exception as e: + raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) + + # and then queue the new task + try: + # Determine the "rewind point" of the worker list. This means we start + # iterating over the list of workers until the end of the list is found. + # Normally, that is simply the length of the workers list (as determined + # by the forks or serial setting), however a task/block/play may "throttle" + # that limit down. + rewind_point = len(self._workers) + if throttle > 0 and self.ALLOW_BASE_THROTTLING: + if task.run_once: + display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name()) + else: + if throttle <= rewind_point: + display.debug("task: %s, throttle: %d" % (task.get_name(), throttle)) + rewind_point = throttle + + queued = False + starting_worker = self._cur_worker + while True: + if self._cur_worker >= rewind_point: + self._cur_worker = 0 + + worker_prc = self._workers[self._cur_worker] + if worker_prc is None or not worker_prc.is_alive(): + self._queued_task_cache[(host.name, task._uuid)] = { + 'host': host, + 'task': task, + 'task_vars': task_vars, + 'play_context': play_context + } + + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader) + self._workers[self._cur_worker] = worker_prc + self._tqm.send_callback('v2_runner_on_start', host, task) + worker_prc.start() + display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) + queued = True + + self._cur_worker += 1 + + if self._cur_worker >= rewind_point: + self._cur_worker = 0 + + if queued: + break + elif self._cur_worker == starting_worker: + time.sleep(0.0001) + + self._pending_results += 1 + except (EOFError, IOError, AssertionError) as e: + # most likely an abort + display.debug("got an error while queuing: %s" % e) + return + display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) + + def get_task_hosts(self, iterator, task_host, task): + if task.run_once: + host_list = [host for host in self._hosts_cache if host not in self._tqm._unreachable_hosts] + else: + host_list = [task_host.name] + return host_list + + def get_delegated_hosts(self, result, task): + host_name = result.get('_ansible_delegated_vars', {}).get('ansible_delegated_host', None) + return [host_name or task.delegate_to] + + def _set_always_delegated_facts(self, result, task): + """Sets host facts for ``delegate_to`` hosts for facts that should + always be delegated + + This operation mutates ``result`` to remove the always delegated facts + + See ``ALWAYS_DELEGATE_FACT_PREFIXES`` + """ + if task.delegate_to is None: + return + + facts = result['ansible_facts'] + always_keys = set() + _add = always_keys.add + for fact_key in facts: + for always_key in ALWAYS_DELEGATE_FACT_PREFIXES: + if fact_key.startswith(always_key): + _add(fact_key) + if always_keys: + _pop = facts.pop + always_facts = { + 'ansible_facts': dict((k, _pop(k)) for k in list(facts) if k in always_keys) + } + host_list = self.get_delegated_hosts(result, task) + _set_host_facts = self._variable_manager.set_host_facts + for target_host in host_list: + _set_host_facts(target_host, always_facts) + + def normalize_task_result(self, task_result): + """Normalize a TaskResult to reference actual Host and Task objects + when only given the ``Host.name``, or the ``Task._uuid`` + + Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from + the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns + + Mutates the original object + """ + + if isinstance(task_result._host, string_types): + # If the value is a string, it is ``Host.name`` + task_result._host = self._inventory.get_host(to_text(task_result._host)) + + if isinstance(task_result._task, string_types): + # If the value is a string, it is ``Task._uuid`` + queue_cache_entry = (task_result._host.name, task_result._task) + try: + found_task = self._queued_task_cache[queue_cache_entry]['task'] + except KeyError: + # This should only happen due to an implicit task created by the + # TaskExecutor, restrict this behavior to the explicit use case + # of an implicit async_status task + if task_result._task_fields.get('action') != 'async_status': + raise + original_task = Task() + else: + original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) + original_task._parent = found_task._parent + original_task.from_attrs(task_result._task_fields) + task_result._task = original_task + + return task_result + + @debug_closure + def _process_pending_results(self, iterator, one_pass=False, max_passes=None): + ''' + Reads results off the final queue and takes appropriate action + based on the result (executing callbacks, updating state, etc.). + ''' + + ret_results = [] + handler_templar = Templar(self._loader) + + def search_handler_blocks_by_name(handler_name, handler_blocks): + # iterate in reversed order since last handler loaded with the same name wins + for handler_block in reversed(handler_blocks): + for handler_task in handler_block.block: + if handler_task.name: + try: + if not handler_task.cached_name: + if handler_templar.is_template(handler_task.name): + handler_templar.available_variables = self._variable_manager.get_vars(play=iterator._play, + task=handler_task, + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all) + handler_task.name = handler_templar.template(handler_task.name) + handler_task.cached_name = True + + # first we check with the full result of get_name(), which may + # include the role name (if the handler is from a role). If that + # is not found, we resort to the simple name field, which doesn't + # have anything extra added to it. + candidates = ( + handler_task.name, + handler_task.get_name(include_role_fqcn=False), + handler_task.get_name(include_role_fqcn=True), + ) + + if handler_name in candidates: + return handler_task + except (UndefinedError, AnsibleUndefinedVariable) as e: + # We skip this handler due to the fact that it may be using + # a variable in the name that was conditionally included via + # set_fact or some other method, and we don't want to error + # out unnecessarily + if not handler_task.listen: + display.warning( + "Handler '%s' is unusable because it has no listen topics and " + "the name could not be templated (host-specific variables are " + "not supported in handler names). The error: %s" % (handler_task.name, to_text(e)) + ) + continue + + cur_pass = 0 + while True: + try: + self._results_lock.acquire() + task_result = self._results.popleft() + except IndexError: + break + finally: + self._results_lock.release() + + original_host = task_result._host + original_task = task_result._task + + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + ignore_errors = original_task.ignore_errors + if not ignore_errors: + # save the current state before failing it for later inspection + state_when_failed = iterator.get_state_for_host(original_host.name) + display.debug("marking %s as failed" % original_host.name) + if original_task.run_once: + # if we're using run_once, we have to fail every host here + for h in self._inventory.get_hosts(iterator._play.hosts): + if h.name not in self._tqm._unreachable_hosts: + iterator.mark_host_failed(h) + else: + iterator.mark_host_failed(original_host) + + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + + if iterator.is_failed(original_host) and state and state.run_state == IteratingStates.COMPLETE: + self._tqm._failed_hosts[original_host.name] = True + + # if we're iterating on the rescue portion of a block then + # we save the failed task in a special var for use + # within the rescue/always + if iterator.is_any_block_rescuing(state_when_failed): + self._tqm._stats.increment('rescued', original_host.name) + iterator._play._removed_hosts.remove(original_host.name) + self._variable_manager.set_nonpersistent_facts( + original_host.name, + dict( + ansible_failed_task=wrap_var(original_task.serialize()), + ansible_failed_result=task_result._result, + ), + ) + else: + self._tqm._stats.increment('failures', original_host.name) + else: + self._tqm._stats.increment('ok', original_host.name) + self._tqm._stats.increment('ignored', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors) + elif task_result.is_unreachable(): + ignore_unreachable = original_task.ignore_unreachable + if not ignore_unreachable: + self._tqm._unreachable_hosts[original_host.name] = True + iterator._play._removed_hosts.append(original_host.name) + self._tqm._stats.increment('dark', original_host.name) + else: + self._tqm._stats.increment('ok', original_host.name) + self._tqm._stats.increment('ignored', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True + + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) + else: + result_items = [task_result._result] + + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + found = False + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers) + if target_handler is not None: + found = True + if target_handler.notify_host(original_host): + self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host) + + for listening_handler_block in iterator._play.handlers: + for listening_handler in listening_handler_block.block: + listeners = getattr(listening_handler, 'listen', []) or [] + if not listeners: + continue + + listeners = listening_handler.get_validated_value( + 'listen', listening_handler.fattributes.get('listen'), listeners, handler_templar + ) + if handler_name not in listeners: + continue + else: + found = True + + if listening_handler.notify_host(original_host): + self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host) + + # and if none were found, then we raise an error + if not found: + msg = ("The requested handler '%s' was not found in either the main handlers list nor in the listening " + "handlers list" % handler_name) + if C.ERROR_ON_MISSING_HANDLER: + raise AnsibleError(msg) + else: + display.warning(msg) + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._inventory.add_dynamic_host(new_host_info, result_item) + # ensure host is available for subsequent plays + if result_item.get('changed') and new_host_info['host_name'] not in self._hosts_cache_all: + self._hosts_cache_all.append(new_host_info['host_name']) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._inventory.add_dynamic_group(original_host, result_item) + + if 'add_host' in result_item or 'add_group' in result_item: + item_vars = _get_item_vars(result_item, original_task) + found_task_vars = self._queued_task_cache.get((original_host.name, task_result._task._uuid))['task_vars'] + if item_vars: + all_task_vars = combine_vars(found_task_vars, item_vars) + else: + all_task_vars = found_task_vars + all_task_vars[original_task.register] = wrap_var(result_item) + post_process_whens(result_item, original_task, handler_templar, all_task_vars) + if original_task.loop or original_task.loop_with: + new_item_result = TaskResult( + task_result._host, + task_result._task, + result_item, + task_result._task_fields, + ) + self._tqm.send_callback('v2_runner_item_on_ok', new_item_result) + if result_item.get('changed', False): + task_result._result['changed'] = True + if result_item.get('failed', False): + task_result._result['failed'] = True + + if 'ansible_facts' in result_item and original_task.action not in C._ACTION_DEBUG: + # if delegated fact and we are delegating facts, we need to change target host for them + if original_task.delegate_to is not None and original_task.delegate_facts: + host_list = self.get_delegated_hosts(result_item, original_task) + else: + # Set facts that should always be on the delegated hosts + self._set_always_delegated_facts(result_item, original_task) + + host_list = self.get_task_hosts(iterator, original_host, original_task) + + if original_task.action in C._ACTION_INCLUDE_VARS: + for (var_name, var_value) in result_item['ansible_facts'].items(): + # find the host we're actually referring too here, which may + # be a host that is not really in inventory at all + for target_host in host_list: + self._variable_manager.set_host_variable(target_host, var_name, var_value) + else: + cacheable = result_item.pop('_ansible_facts_cacheable', False) + for target_host in host_list: + # so set_fact is a misnomer but 'cacheable = true' was meant to create an 'actual fact' + # to avoid issues with precedence and confusion with set_fact normal operation, + # we set BOTH fact and nonpersistent_facts (aka hostvar) + # when fact is retrieved from cache in subsequent operations it will have the lower precedence, + # but for playbook setting it the 'higher' precedence is kept + is_set_fact = original_task.action in C._ACTION_SET_FACT + if not is_set_fact or cacheable: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + if is_set_fact: + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + + if 'ansible_stats' in result_item and 'data' in result_item['ansible_stats'] and result_item['ansible_stats']['data']: + + if 'per_host' not in result_item['ansible_stats'] or result_item['ansible_stats']['per_host']: + host_list = self.get_task_hosts(iterator, original_host, original_task) + else: + host_list = [None] + + data = result_item['ansible_stats']['data'] + aggregate = 'aggregate' in result_item['ansible_stats'] and result_item['ansible_stats']['aggregate'] + for myhost in host_list: + for k in data.keys(): + if aggregate: + self._tqm._stats.update_custom_stats(k, data[k], myhost) + else: + self._tqm._stats.set_custom_stats(k, data[k], myhost) + + if 'diff' in task_result._result: + if self._diff or getattr(original_task, 'diff', False): + self._tqm.send_callback('v2_on_file_diff', task_result) + + if not isinstance(original_task, TaskInclude): + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) + + # register final results + if original_task.register: + + if not isidentifier(original_task.register): + raise AnsibleError("Invalid variable name in 'register' specified: '%s'" % original_task.register) + + host_list = self.get_task_hosts(iterator, original_host, original_task) + + clean_copy = strip_internal_keys(module_response_deepcopy(task_result._result)) + if 'invocation' in clean_copy: + del clean_copy['invocation'] + + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) + + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] + + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: # TODO: and original_task.action not in C._ACTION_INCLUDE_ROLE:? + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iterator._play.ROLE_CACHE[original_task._role.get_name()].items(): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True + + ret_results.append(task_result) + + if isinstance(original_task, Handler): + for handler in (h for b in iterator._play.handlers for h in b.block if h._uuid == original_task._uuid): + handler.remove_host(original_host) + + if one_pass or max_passes is not None and (cur_pass + 1) >= max_passes: + break + + cur_pass += 1 + + return ret_results + + def _wait_on_pending_results(self, iterator): + ''' + Wait for the shared counter to drop to zero, using a short sleep + between checks to ensure we don't spin lock + ''' + + ret_results = [] + + display.debug("waiting for pending results...") + while self._pending_results > 0 and not self._tqm._terminated: + + if self._tqm.has_dead_workers(): + raise AnsibleError("A worker was found in a dead state") + + results = self._process_pending_results(iterator) + ret_results.extend(results) + if self._pending_results > 0: + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + + display.debug("no more pending results, returning what we have") + + return ret_results + + def _copy_included_file(self, included_file): + ''' + A proven safe and performant way to create a copy of an included file + ''' + ti_copy = included_file._task.copy(exclude_parent=True) + ti_copy._parent = included_file._task._parent + + temp_vars = ti_copy.vars | included_file._vars + + ti_copy.vars = temp_vars + + return ti_copy + + def _load_included_file(self, included_file, iterator, is_handler=False): + ''' + Loads an included YAML file of tasks, applying the optional set of variables. + + Raises AnsibleError exception in case of a failure during including a file, + in such case the caller is responsible for marking the host(s) as failed + using PlayIterator.mark_host_failed(). + ''' + display.debug("loading included file: %s" % included_file._filename) + try: + data = self._loader.load_from_file(included_file._filename) + if data is None: + return [] + elif not isinstance(data, list): + raise AnsibleError("included task files must contain a list of tasks") + + ti_copy = self._copy_included_file(included_file) + + block_list = load_list_of_blocks( + data, + play=iterator._play, + parent_block=ti_copy.build_parent_block(), + role=included_file._task._role, + use_handlers=is_handler, + loader=self._loader, + variable_manager=self._variable_manager, + ) + + # since we skip incrementing the stats when the task result is + # first processed, we do so now for each host in the list + for host in included_file._hosts: + self._tqm._stats.increment('ok', host.name) + except AnsibleParserError: + raise + except AnsibleError as e: + if isinstance(e, AnsibleFileNotFound): + reason = "Could not find or access '%s' on the Ansible Controller." % to_text(e.file_name) + else: + reason = to_text(e) + + for r in included_file._results: + r._result['failed'] = True + + for host in included_file._hosts: + tr = TaskResult(host=host, task=included_file._task, return_data=dict(failed=True, reason=reason)) + self._tqm._stats.increment('failures', host.name) + self._tqm.send_callback('v2_runner_on_failed', tr) + raise AnsibleError(reason) from e + + # finally, send the callback and return the list of blocks loaded + self._tqm.send_callback('v2_playbook_on_include', included_file) + display.debug("done processing included file") + return block_list + + def _take_step(self, task, host=None): + + ret = False + msg = u'Perform task: %s ' % task + if host: + msg += u'on %s ' % host + msg += u'(N)o/(y)es/(c)ontinue: ' + resp = display.prompt(msg) + + if resp.lower() in ['y', 'yes']: + display.debug("User ran task") + ret = True + elif resp.lower() in ['c', 'continue']: + display.debug("User ran task and canceled step mode") + self._step = False + ret = True + else: + display.debug("User skipped task") + + display.banner(msg) + + return ret + + def _cond_not_supported_warn(self, task_name): + display.warning("%s task does not support when conditional" % task_name) + + def _execute_meta(self, task, play_context, iterator, target_host): + + # meta tasks store their args in the _raw_params field of args, + # since they do not use k=v pairs, so get that + meta_action = task.args.get('_raw_params') + + def _evaluate_conditional(h): + all_vars = self._variable_manager.get_vars(play=iterator._play, host=h, task=task, + _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) + templar = Templar(loader=self._loader, variables=all_vars) + return task.evaluate_conditional(templar, all_vars) + + skipped = False + msg = meta_action + skip_reason = '%s conditional evaluated to False' % meta_action + if isinstance(task, Handler): + self._tqm.send_callback('v2_playbook_on_handler_task_start', task) + else: + self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) + + # These don't support "when" conditionals + if meta_action in ('noop', 'refresh_inventory', 'reset_connection') and task.when: + self._cond_not_supported_warn(meta_action) + + if meta_action == 'noop': + msg = "noop" + elif meta_action == 'flush_handlers': + if _evaluate_conditional(target_host): + host_state = iterator.get_state_for_host(target_host.name) + if host_state.run_state == IteratingStates.HANDLERS: + raise AnsibleError('flush_handlers cannot be used as a handler') + if target_host.name not in self._tqm._unreachable_hosts: + host_state.pre_flushing_run_state = host_state.run_state + host_state.run_state = IteratingStates.HANDLERS + msg = "triggered running handlers for %s" % target_host.name + else: + skipped = True + skip_reason += ', not running handlers for %s' % target_host.name + elif meta_action == 'refresh_inventory': + self._inventory.refresh_inventory() + self._set_hosts_cache(iterator._play) + msg = "inventory successfully refreshed" + elif meta_action == 'clear_facts': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + hostname = host.get_name() + self._variable_manager.clear_facts(hostname) + msg = "facts cleared" + else: + skipped = True + skip_reason += ', not clearing facts and fact cache for %s' % target_host.name + elif meta_action == 'clear_host_errors': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + self._tqm._failed_hosts.pop(host.name, False) + self._tqm._unreachable_hosts.pop(host.name, False) + iterator.clear_host_errors(host) + msg = "cleared host errors" + else: + skipped = True + skip_reason += ', not clearing host error state for %s' % target_host.name + elif meta_action == 'end_batch': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + if host.name not in self._tqm._unreachable_hosts: + iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) + msg = "ending batch" + else: + skipped = True + skip_reason += ', continuing current batch' + elif meta_action == 'end_play': + if _evaluate_conditional(target_host): + for host in self._inventory.get_hosts(iterator._play.hosts): + if host.name not in self._tqm._unreachable_hosts: + iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) + # end_play is used in PlaybookExecutor/TQM to indicate that + # the whole play is supposed to be ended as opposed to just a batch + iterator.end_play = True + msg = "ending play" + else: + skipped = True + skip_reason += ', continuing play' + elif meta_action == 'end_host': + if _evaluate_conditional(target_host): + iterator.set_run_state_for_host(target_host.name, IteratingStates.COMPLETE) + iterator._play._removed_hosts.append(target_host.name) + msg = "ending play for %s" % target_host.name + else: + skipped = True + skip_reason += ", continuing execution for %s" % target_host.name + # TODO: Nix msg here? Left for historical reasons, but skip_reason exists now. + msg = "end_host conditional evaluated to false, continuing execution for %s" % target_host.name + elif meta_action == 'role_complete': + # Allow users to use this in a play as reported in https://github.com/ansible/ansible/issues/22286? + # How would this work with allow_duplicates?? + if task.implicit: + if target_host.name in task._role._had_task_run: + task._role._completed[target_host.name] = True + msg = 'role_complete for %s' % target_host.name + elif meta_action == 'reset_connection': + all_vars = self._variable_manager.get_vars(play=iterator._play, host=target_host, task=task, + _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) + templar = Templar(loader=self._loader, variables=all_vars) + + # apply the given task's information to the connection info, + # which may override some fields already set by the play or + # the options specified on the command line + play_context = play_context.set_task_and_variable_override(task=task, variables=all_vars, templar=templar) + + # fields set from the play/task may be based on variables, so we have to + # do the same kind of post validation step on it here before we use it. + play_context.post_validate(templar=templar) + + # now that the play context is finalized, if the remote_addr is not set + # default to using the host's address field as the remote address + if not play_context.remote_addr: + play_context.remote_addr = target_host.address + + # We also add "magic" variables back into the variables dict to make sure + # a certain subset of variables exist. This 'mostly' works here cause meta + # disregards the loop, but should not really use play_context at all + play_context.update_vars(all_vars) + + if target_host in self._active_connections: + connection = Connection(self._active_connections[target_host]) + del self._active_connections[target_host] + else: + connection = plugin_loader.connection_loader.get(play_context.connection, play_context, os.devnull) + connection.set_options(task_keys=task.dump_attrs(), var_options=all_vars) + play_context.set_attributes_from_plugin(connection) + + if connection: + try: + connection.reset() + msg = 'reset connection' + except ConnectionError as e: + # most likely socket is already closed + display.debug("got an error while closing persistent connection: %s" % e) + else: + msg = 'no connection, nothing to reset' + else: + raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds) + + result = {'msg': msg} + if skipped: + result['skipped'] = True + result['skip_reason'] = skip_reason + else: + result['changed'] = False + + if not task.implicit: + header = skip_reason if skipped else msg + display.vv(f"META: {header}") + + if isinstance(task, Handler): + task.remove_host(target_host) + + res = TaskResult(target_host, task, result) + if skipped: + self._tqm.send_callback('v2_runner_on_skipped', res) + return [res] + + def get_hosts_left(self, iterator): + ''' returns list of available hosts for this iterator by filtering out unreachables ''' + + hosts_left = [] + for host in self._hosts_cache: + if host not in self._tqm._unreachable_hosts: + try: + hosts_left.append(self._inventory.hosts[host]) + except KeyError: + hosts_left.append(self._inventory.get_host(host)) + return hosts_left + + def update_active_connections(self, results): + ''' updates the current active persistent connections ''' + for r in results: + if 'args' in r._task_fields: + socket_path = r._task_fields['args'].get('_ansible_socket') + if socket_path: + if r._host not in self._active_connections: + self._active_connections[r._host] = socket_path + + +class NextAction(object): + """ The next action after an interpreter's exit. """ + REDO = 1 + CONTINUE = 2 + EXIT = 3 + + def __init__(self, result=EXIT): + self.result = result + + +class Debugger(cmd.Cmd): + prompt_continuous = '> ' # multiple lines + + def __init__(self, task, host, task_vars, play_context, result, next_action): + # cmd.Cmd is old-style class + cmd.Cmd.__init__(self) + + self.prompt = '[%s] %s (debug)> ' % (host, task) + self.intro = None + self.scope = {} + self.scope['task'] = task + self.scope['task_vars'] = task_vars + self.scope['host'] = host + self.scope['play_context'] = play_context + self.scope['result'] = result + self.next_action = next_action + + def cmdloop(self): + try: + cmd.Cmd.cmdloop(self) + except KeyboardInterrupt: + pass + + do_h = cmd.Cmd.do_help + + def do_EOF(self, args): + """Quit""" + return self.do_quit(args) + + def do_quit(self, args): + """Quit""" + display.display('User interrupted execution') + self.next_action.result = NextAction.EXIT + return True + + do_q = do_quit + + def do_continue(self, args): + """Continue to next result""" + self.next_action.result = NextAction.CONTINUE + return True + + do_c = do_continue + + def do_redo(self, args): + """Schedule task for re-execution. The re-execution may not be the next result""" + self.next_action.result = NextAction.REDO + return True + + do_r = do_redo + + def do_update_task(self, args): + """Recreate the task from ``task._ds``, and template with updated ``task_vars``""" + templar = Templar(None, variables=self.scope['task_vars']) + task = self.scope['task'] + task = task.load_data(task._ds) + task.post_validate(templar) + self.scope['task'] = task + + do_u = do_update_task + + def evaluate(self, args): + try: + return eval(args, globals(), self.scope) + except Exception: + t, v = sys.exc_info()[:2] + if isinstance(t, str): + exc_type_name = t + else: + exc_type_name = t.__name__ + display.display('***%s:%s' % (exc_type_name, repr(v))) + raise + + def do_pprint(self, args): + """Pretty Print""" + try: + result = self.evaluate(args) + display.display(pprint.pformat(result)) + except Exception: + pass + + do_p = do_pprint + + def execute(self, args): + try: + code = compile(args + '\n', '<stdin>', 'single') + exec(code, globals(), self.scope) + except Exception: + t, v = sys.exc_info()[:2] + if isinstance(t, str): + exc_type_name = t + else: + exc_type_name = t.__name__ + display.display('***%s:%s' % (exc_type_name, repr(v))) + raise + + def default(self, line): + try: + self.execute(line) + except Exception: + pass diff --git a/lib/ansible/plugins/strategy/debug.py b/lib/ansible/plugins/strategy/debug.py new file mode 100644 index 0000000..f808bcf --- /dev/null +++ b/lib/ansible/plugins/strategy/debug.py @@ -0,0 +1,37 @@ +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: debug + short_description: Executes tasks in interactive debug session. + description: + - Task execution is 'linear' but controlled by an interactive debug session. + version_added: "2.1" + author: Kishin Yagami (!UNKNOWN) +''' + +import cmd +import pprint +import sys + +from ansible.plugins.strategy.linear import StrategyModule as LinearStrategyModule + + +class StrategyModule(LinearStrategyModule): + def __init__(self, tqm): + super(StrategyModule, self).__init__(tqm) + self.debugger_active = True diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py new file mode 100644 index 0000000..6f45114 --- /dev/null +++ b/lib/ansible/plugins/strategy/free.py @@ -0,0 +1,303 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: free + short_description: Executes tasks without waiting for all hosts + description: + - Task execution is as fast as possible per batch as defined by C(serial) (default all). + Ansible will not wait for other hosts to finish the current task before queuing more tasks for other hosts. + All hosts are still attempted for the current task, but it prevents blocking new tasks for hosts that have already finished. + - With the free strategy, unlike the default linear strategy, a host that is slow or stuck on a specific task + won't hold up the rest of the hosts and tasks. + version_added: "2.0" + author: Ansible Core Team +''' + +import time + +from ansible import constants as C +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.playbook.handler import Handler +from ansible.playbook.included_file import IncludedFile +from ansible.plugins.loader import action_loader +from ansible.plugins.strategy import StrategyBase +from ansible.template import Templar +from ansible.module_utils._text import to_text +from ansible.utils.display import Display + +display = Display() + + +class StrategyModule(StrategyBase): + + # This strategy manages throttling on its own, so we don't want it done in queue_task + ALLOW_BASE_THROTTLING = False + + def __init__(self, tqm): + super(StrategyModule, self).__init__(tqm) + self._host_pinned = False + + def run(self, iterator, play_context): + ''' + The "free" strategy is a bit more complex, in that it allows tasks to + be sent to hosts as quickly as they can be processed. This means that + some hosts may finish very quickly if run tasks result in little or no + work being done versus other systems. + + The algorithm used here also tries to be more "fair" when iterating + through hosts by remembering the last host in the list to be given a task + and starting the search from there as opposed to the top of the hosts + list again, which would end up favoring hosts near the beginning of the + list. + ''' + + # the last host to be given a task + last_host = 0 + + result = self._tqm.RUN_OK + + # start with all workers being counted as being free + workers_free = len(self._workers) + + self._set_hosts_cache(iterator._play) + + if iterator._play.max_fail_percentage is not None: + display.warning("Using max_fail_percentage with the free strategy is not supported, as tasks are executed independently on each host") + + work_to_do = True + while work_to_do and not self._tqm._terminated: + + hosts_left = self.get_hosts_left(iterator) + + if len(hosts_left) == 0: + self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') + result = False + break + + work_to_do = False # assume we have no more work to do + starting_host = last_host # save current position so we know when we've looped back around and need to break + + # try and find an unblocked host with a task to run + host_results = [] + while True: + host = hosts_left[last_host] + display.debug("next free host: %s" % host) + host_name = host.get_name() + + # peek at the next task for the host, to see if there's + # anything to do do for this host + (state, task) = iterator.get_next_task_for_host(host, peek=True) + display.debug("free host state: %s" % state, host=host_name) + display.debug("free host task: %s" % task, host=host_name) + + # check if there is work to do, either there is a task or the host is still blocked which could + # mean that it is processing an include task and after its result is processed there might be + # more tasks to run + if (task or self._blocked_hosts.get(host_name, False)) and not self._tqm._unreachable_hosts.get(host_name, False): + display.debug("this host has work to do", host=host_name) + # set the flag so the outer loop knows we've still found + # some work which needs to be done + work_to_do = True + + if not self._tqm._unreachable_hosts.get(host_name, False) and task: + # check to see if this host is blocked (still executing a previous task) + if not self._blocked_hosts.get(host_name, False): + display.debug("getting variables", host=host_name) + task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all) + self.add_tqm_variables(task_vars, play=iterator._play) + templar = Templar(loader=self._loader, variables=task_vars) + display.debug("done getting variables", host=host_name) + + try: + throttle = int(templar.template(task.throttle)) + except Exception as e: + raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) + + if throttle > 0: + same_tasks = 0 + for worker in self._workers: + if worker and worker.is_alive() and worker._task._uuid == task._uuid: + same_tasks += 1 + + display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks)) + if same_tasks >= throttle: + break + + # advance the host, mark the host blocked, and queue it + self._blocked_hosts[host_name] = True + iterator.set_state_for_host(host.name, state) + + try: + action = action_loader.get(task.action, class_only=True, collection_list=task.collections) + except KeyError: + # we don't care here, because the action may simply not have a + # corresponding action plugin + action = None + + try: + task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') + display.debug("done templating", host=host_name) + except Exception: + # just ignore any errors during task name templating, + # we don't care if it just shows the raw name + display.debug("templating failed for some reason", host=host_name) + + run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False) + if run_once: + if action and getattr(action, 'BYPASS_HOST_LOOP', False): + raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy " + "and would instead execute for every host in the inventory list." % task.action, obj=task._ds) + else: + display.warning("Using run_once with the free strategy is not currently supported. This task will still be " + "executed for every host in the inventory list.") + + # check to see if this task should be skipped, due to it being a member of a + # role which has already run (and whether that role allows duplicate execution) + if not isinstance(task, Handler) and task._role and task._role.has_run(host): + # If there is no metadata, the default behavior is to not allow duplicates, + # if there is metadata, check to see if the allow_duplicates flag was set to true + if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: + display.debug("'%s' skipped because role has already run" % task, host=host_name) + del self._blocked_hosts[host_name] + continue + + if task.action in C._ACTION_META: + self._execute_meta(task, play_context, iterator, target_host=host) + self._blocked_hosts[host_name] = False + else: + # handle step if needed, skip meta actions as they are used internally + if not self._step or self._take_step(task, host_name): + if task.any_errors_fatal: + display.warning("Using any_errors_fatal with the free strategy is not supported, " + "as tasks are executed independently on each host") + if isinstance(task, Handler): + self._tqm.send_callback('v2_playbook_on_handler_task_start', task) + else: + self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) + self._queue_task(host, task, task_vars, play_context) + # each task is counted as a worker being busy + workers_free -= 1 + del task_vars + else: + display.debug("%s is blocked, skipping for now" % host_name) + + # all workers have tasks to do (and the current host isn't done with the play). + # loop back to starting host and break out + if self._host_pinned and workers_free == 0 and work_to_do: + last_host = starting_host + break + + # move on to the next host and make sure we + # haven't gone past the end of our hosts list + last_host += 1 + if last_host > len(hosts_left) - 1: + last_host = 0 + + # if we've looped around back to the start, break out + if last_host == starting_host: + break + + results = self._process_pending_results(iterator) + host_results.extend(results) + + # each result is counted as a worker being free again + workers_free += len(results) + + self.update_active_connections(results) + + included_files = IncludedFile.process_include_results( + host_results, + iterator=iterator, + loader=self._loader, + variable_manager=self._variable_manager + ) + + if len(included_files) > 0: + all_blocks = dict((host, []) for host in hosts_left) + failed_includes_hosts = set() + for included_file in included_files: + display.debug("collecting new blocks for %s" % included_file) + is_handler = False + try: + if included_file._is_role: + new_ir = self._copy_included_file(included_file) + + new_blocks, handler_blocks = new_ir.get_block_list( + play=iterator._play, + variable_manager=self._variable_manager, + loader=self._loader, + ) + else: + is_handler = isinstance(included_file._task, Handler) + new_blocks = self._load_included_file(included_file, iterator=iterator, is_handler=is_handler) + + # let PlayIterator know about any new handlers included via include_role or + # import_role within include_role/include_taks + iterator.handlers = [h for b in iterator._play.handlers for h in b.block] + except AnsibleParserError: + raise + except AnsibleError as e: + if included_file._is_role: + # include_role does not have on_include callback so display the error + display.error(to_text(e), wrap_text=False) + for r in included_file._results: + r._result['failed'] = True + failed_includes_hosts.add(r._host) + continue + + for new_block in new_blocks: + if is_handler: + for task in new_block.block: + task.notified_hosts = included_file._hosts[:] + final_block = new_block + else: + task_vars = self._variable_manager.get_vars( + play=iterator._play, + task=new_block.get_first_parent_include(), + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all, + ) + final_block = new_block.filter_tagged_tasks(task_vars) + for host in hosts_left: + if host in included_file._hosts: + all_blocks[host].append(final_block) + display.debug("done collecting new blocks for %s" % included_file) + + for host in failed_includes_hosts: + self._tqm._failed_hosts[host.name] = True + iterator.mark_host_failed(host) + + display.debug("adding all collected blocks from %d included file(s) to iterator" % len(included_files)) + for host in hosts_left: + iterator.add_tasks(host, all_blocks[host]) + display.debug("done adding collected blocks to iterator") + + # pause briefly so we don't spin lock + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + + # collect all the final results + results = self._wait_on_pending_results(iterator) + + # run the base class run() method, which executes the cleanup function + # and runs any outstanding handlers which have been triggered + return super(StrategyModule, self).run(iterator, play_context, result) diff --git a/lib/ansible/plugins/strategy/host_pinned.py b/lib/ansible/plugins/strategy/host_pinned.py new file mode 100644 index 0000000..70f22eb --- /dev/null +++ b/lib/ansible/plugins/strategy/host_pinned.py @@ -0,0 +1,45 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: host_pinned + short_description: Executes tasks on each host without interruption + description: + - Task execution is as fast as possible per host in batch as defined by C(serial) (default all). + Ansible will not start a play for a host unless the play can be finished without interruption by tasks for another host, + i.e. the number of hosts with an active play does not exceed the number of forks. + Ansible will not wait for other hosts to finish the current task before queuing the next task for a host that has finished. + Once a host is done with the play, it opens it's slot to a new host that was waiting to start. + Other than that, it behaves just like the "free" strategy. + version_added: "2.7" + author: Ansible Core Team +''' + +from ansible.plugins.strategy.free import StrategyModule as FreeStrategyModule +from ansible.utils.display import Display + +display = Display() + + +class StrategyModule(FreeStrategyModule): + + def __init__(self, tqm): + super(StrategyModule, self).__init__(tqm) + self._host_pinned = True diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py new file mode 100644 index 0000000..a3c91c2 --- /dev/null +++ b/lib/ansible/plugins/strategy/linear.py @@ -0,0 +1,406 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: linear + short_description: Executes tasks in a linear fashion + description: + - Task execution is in lockstep per host batch as defined by C(serial) (default all). + Up to the fork limit of hosts will execute each task at the same time and then + the next series of hosts until the batch is done, before going on to the next task. + version_added: "2.0" + notes: + - This was the default Ansible behaviour before 'strategy plugins' were introduced in 2.0. + author: Ansible Core Team +''' + +from ansible import constants as C +from ansible.errors import AnsibleError, AnsibleAssertionError, AnsibleParserError +from ansible.executor.play_iterator import IteratingStates, FailedStates +from ansible.module_utils._text import to_text +from ansible.playbook.handler import Handler +from ansible.playbook.included_file import IncludedFile +from ansible.playbook.task import Task +from ansible.plugins.loader import action_loader +from ansible.plugins.strategy import StrategyBase +from ansible.template import Templar +from ansible.utils.display import Display + +display = Display() + + +class StrategyModule(StrategyBase): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # used for the lockstep to indicate to run handlers + self._in_handlers = False + + def _get_next_task_lockstep(self, hosts, iterator): + ''' + Returns a list of (host, task) tuples, where the task may + be a noop task to keep the iterator in lock step across + all hosts. + ''' + noop_task = Task() + noop_task.action = 'meta' + noop_task.args['_raw_params'] = 'noop' + noop_task.implicit = True + noop_task.set_loader(iterator._play._loader) + + state_task_per_host = {} + for host in hosts: + state, task = iterator.get_next_task_for_host(host, peek=True) + if task is not None: + state_task_per_host[host] = state, task + + if not state_task_per_host: + return [(h, None) for h in hosts] + + if self._in_handlers and not any(filter( + lambda rs: rs == IteratingStates.HANDLERS, + (s.run_state for s, _ in state_task_per_host.values())) + ): + self._in_handlers = False + + if self._in_handlers: + lowest_cur_handler = min( + s.cur_handlers_task for s, t in state_task_per_host.values() + if s.run_state == IteratingStates.HANDLERS + ) + else: + task_uuids = [t._uuid for s, t in state_task_per_host.values()] + _loop_cnt = 0 + while _loop_cnt <= 1: + try: + cur_task = iterator.all_tasks[iterator.cur_task] + except IndexError: + # pick up any tasks left after clear_host_errors + iterator.cur_task = 0 + _loop_cnt += 1 + else: + iterator.cur_task += 1 + if cur_task._uuid in task_uuids: + break + else: + # prevent infinite loop + raise AnsibleAssertionError( + 'BUG: There seems to be a mismatch between tasks in PlayIterator and HostStates.' + ) + + host_tasks = [] + for host, (state, task) in state_task_per_host.items(): + if ((self._in_handlers and lowest_cur_handler == state.cur_handlers_task) or + (not self._in_handlers and cur_task._uuid == task._uuid)): + iterator.set_state_for_host(host.name, state) + host_tasks.append((host, task)) + else: + host_tasks.append((host, noop_task)) + + # once hosts synchronize on 'flush_handlers' lockstep enters + # '_in_handlers' phase where handlers are run instead of tasks + # until at least one host is in IteratingStates.HANDLERS + if (not self._in_handlers and cur_task.action in C._ACTION_META and + cur_task.args.get('_raw_params') == 'flush_handlers'): + self._in_handlers = True + + return host_tasks + + def run(self, iterator, play_context): + ''' + The linear strategy is simple - get the next task and queue + it for all hosts, then wait for the queue to drain before + moving on to the next task + ''' + + # iterate over each task, while there is one left to run + result = self._tqm.RUN_OK + work_to_do = True + + self._set_hosts_cache(iterator._play) + + while work_to_do and not self._tqm._terminated: + + try: + display.debug("getting the remaining hosts for this loop") + hosts_left = self.get_hosts_left(iterator) + display.debug("done getting the remaining hosts for this loop") + + # queue up this task for each host in the inventory + callback_sent = False + work_to_do = False + + host_tasks = self._get_next_task_lockstep(hosts_left, iterator) + + # skip control + skip_rest = False + choose_step = True + + # flag set if task is set to any_errors_fatal + any_errors_fatal = False + + results = [] + for (host, task) in host_tasks: + if not task: + continue + + if self._tqm._terminated: + break + + run_once = False + work_to_do = True + + # check to see if this task should be skipped, due to it being a member of a + # role which has already run (and whether that role allows duplicate execution) + if not isinstance(task, Handler) and task._role and task._role.has_run(host): + # If there is no metadata, the default behavior is to not allow duplicates, + # if there is metadata, check to see if the allow_duplicates flag was set to true + if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: + display.debug("'%s' skipped because role has already run" % task) + continue + + display.debug("getting variables") + task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, + _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) + self.add_tqm_variables(task_vars, play=iterator._play) + templar = Templar(loader=self._loader, variables=task_vars) + display.debug("done getting variables") + + # test to see if the task across all hosts points to an action plugin which + # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we + # will only send this task to the first host in the list. + + task_action = templar.template(task.action) + + try: + action = action_loader.get(task_action, class_only=True, collection_list=task.collections) + except KeyError: + # we don't care here, because the action may simply not have a + # corresponding action plugin + action = None + + if task_action in C._ACTION_META: + # for the linear strategy, we run meta tasks just once and for + # all hosts currently being iterated over rather than one host + results.extend(self._execute_meta(task, play_context, iterator, host)) + if task.args.get('_raw_params', None) not in ('noop', 'reset_connection', 'end_host', 'role_complete', 'flush_handlers'): + run_once = True + if (task.any_errors_fatal or run_once) and not task.ignore_errors: + any_errors_fatal = True + else: + # handle step if needed, skip meta actions as they are used internally + if self._step and choose_step: + if self._take_step(task): + choose_step = False + else: + skip_rest = True + break + + run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False) + + if (task.any_errors_fatal or run_once) and not task.ignore_errors: + any_errors_fatal = True + + if not callback_sent: + display.debug("sending task start callback, copying the task so we can template it temporarily") + saved_name = task.name + display.debug("done copying, going to template now") + try: + task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') + display.debug("done templating") + except Exception: + # just ignore any errors during task name templating, + # we don't care if it just shows the raw name + display.debug("templating failed for some reason") + display.debug("here goes the callback...") + if isinstance(task, Handler): + self._tqm.send_callback('v2_playbook_on_handler_task_start', task) + else: + self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) + task.name = saved_name + callback_sent = True + display.debug("sending task start callback") + + self._blocked_hosts[host.get_name()] = True + self._queue_task(host, task, task_vars, play_context) + del task_vars + + # if we're bypassing the host loop, break out now + if run_once: + break + + results.extend(self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))) + + # go to next host/task group + if skip_rest: + continue + + display.debug("done queuing things up, now waiting for results queue to drain") + if self._pending_results > 0: + results.extend(self._wait_on_pending_results(iterator)) + + self.update_active_connections(results) + + included_files = IncludedFile.process_include_results( + results, + iterator=iterator, + loader=self._loader, + variable_manager=self._variable_manager + ) + + if len(included_files) > 0: + display.debug("we have included files to process") + + display.debug("generating all_blocks data") + all_blocks = dict((host, []) for host in hosts_left) + display.debug("done generating all_blocks data") + included_tasks = [] + failed_includes_hosts = set() + for included_file in included_files: + display.debug("processing included file: %s" % included_file._filename) + is_handler = False + try: + if included_file._is_role: + new_ir = self._copy_included_file(included_file) + + new_blocks, handler_blocks = new_ir.get_block_list( + play=iterator._play, + variable_manager=self._variable_manager, + loader=self._loader, + ) + else: + is_handler = isinstance(included_file._task, Handler) + new_blocks = self._load_included_file(included_file, iterator=iterator, is_handler=is_handler) + + # let PlayIterator know about any new handlers included via include_role or + # import_role within include_role/include_taks + iterator.handlers = [h for b in iterator._play.handlers for h in b.block] + + display.debug("iterating over new_blocks loaded from include file") + for new_block in new_blocks: + if is_handler: + for task in new_block.block: + task.notified_hosts = included_file._hosts[:] + final_block = new_block + else: + task_vars = self._variable_manager.get_vars( + play=iterator._play, + task=new_block.get_first_parent_include(), + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all, + ) + display.debug("filtering new block on tags") + final_block = new_block.filter_tagged_tasks(task_vars) + display.debug("done filtering new block on tags") + + included_tasks.extend(final_block.get_tasks()) + + for host in hosts_left: + if host in included_file._hosts: + all_blocks[host].append(final_block) + + display.debug("done iterating over new_blocks loaded from include file") + except AnsibleParserError: + raise + except AnsibleError as e: + if included_file._is_role: + # include_role does not have on_include callback so display the error + display.error(to_text(e), wrap_text=False) + for r in included_file._results: + r._result['failed'] = True + failed_includes_hosts.add(r._host) + continue + + for host in failed_includes_hosts: + self._tqm._failed_hosts[host.name] = True + iterator.mark_host_failed(host) + + # finally go through all of the hosts and append the + # accumulated blocks to their list of tasks + display.debug("extending task lists for all hosts with included blocks") + + for host in hosts_left: + iterator.add_tasks(host, all_blocks[host]) + + iterator.all_tasks[iterator.cur_task:iterator.cur_task] = included_tasks + + display.debug("done extending task lists") + display.debug("done processing included files") + + display.debug("results queue empty") + + display.debug("checking for any_errors_fatal") + failed_hosts = [] + unreachable_hosts = [] + for res in results: + # execute_meta() does not set 'failed' in the TaskResult + # so we skip checking it with the meta tasks and look just at the iterator + if (res.is_failed() or res._task.action in C._ACTION_META) and iterator.is_failed(res._host): + failed_hosts.append(res._host.name) + elif res.is_unreachable(): + unreachable_hosts.append(res._host.name) + + # if any_errors_fatal and we had an error, mark all hosts as failed + if any_errors_fatal and (len(failed_hosts) > 0 or len(unreachable_hosts) > 0): + dont_fail_states = frozenset([IteratingStates.RESCUE, IteratingStates.ALWAYS]) + for host in hosts_left: + (s, _) = iterator.get_next_task_for_host(host, peek=True) + # the state may actually be in a child state, use the get_active_state() + # method in the iterator to figure out the true active state + s = iterator.get_active_state(s) + if s.run_state not in dont_fail_states or \ + s.run_state == IteratingStates.RESCUE and s.fail_state & FailedStates.RESCUE != 0: + self._tqm._failed_hosts[host.name] = True + result |= self._tqm.RUN_FAILED_BREAK_PLAY + display.debug("done checking for any_errors_fatal") + + display.debug("checking for max_fail_percentage") + if iterator._play.max_fail_percentage is not None and len(results) > 0: + percentage = iterator._play.max_fail_percentage / 100.0 + + if (len(self._tqm._failed_hosts) / iterator.batch_size) > percentage: + for host in hosts_left: + # don't double-mark hosts, or the iterator will potentially + # fail them out of the rescue/always states + if host.name not in failed_hosts: + self._tqm._failed_hosts[host.name] = True + iterator.mark_host_failed(host) + self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') + result |= self._tqm.RUN_FAILED_BREAK_PLAY + display.debug('(%s failed / %s total )> %s max fail' % (len(self._tqm._failed_hosts), iterator.batch_size, percentage)) + display.debug("done checking for max_fail_percentage") + + display.debug("checking to see if all hosts have failed and the running result is not ok") + if result != self._tqm.RUN_OK and len(self._tqm._failed_hosts) >= len(hosts_left): + display.debug("^ not ok, so returning result now") + self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') + return result + display.debug("done checking to see if all hosts have failed") + + except (IOError, EOFError) as e: + display.debug("got IOError/EOFError in task loop: %s" % e) + # most likely an abort, return failed + return self._tqm.RUN_UNKNOWN_ERROR + + # run the base class run() method, which executes the cleanup function + # and runs any outstanding handlers which have been triggered + + return super(StrategyModule, self).run(iterator, play_context, result) |