summaryrefslogtreecommitdiffstats
path: root/lib/ansible/plugins/strategy/free.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/plugins/strategy/free.py')
-rw-r--r--lib/ansible/plugins/strategy/free.py303
1 files changed, 303 insertions, 0 deletions
diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py
new file mode 100644
index 0000000..6f45114
--- /dev/null
+++ b/lib/ansible/plugins/strategy/free.py
@@ -0,0 +1,303 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+DOCUMENTATION = '''
+ name: free
+ short_description: Executes tasks without waiting for all hosts
+ description:
+ - Task execution is as fast as possible per batch as defined by C(serial) (default all).
+ Ansible will not wait for other hosts to finish the current task before queuing more tasks for other hosts.
+ All hosts are still attempted for the current task, but it prevents blocking new tasks for hosts that have already finished.
+ - With the free strategy, unlike the default linear strategy, a host that is slow or stuck on a specific task
+ won't hold up the rest of the hosts and tasks.
+ version_added: "2.0"
+ author: Ansible Core Team
+'''
+
+import time
+
+from ansible import constants as C
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.playbook.handler import Handler
+from ansible.playbook.included_file import IncludedFile
+from ansible.plugins.loader import action_loader
+from ansible.plugins.strategy import StrategyBase
+from ansible.template import Templar
+from ansible.module_utils._text import to_text
+from ansible.utils.display import Display
+
+display = Display()
+
+
+class StrategyModule(StrategyBase):
+
+ # This strategy manages throttling on its own, so we don't want it done in queue_task
+ ALLOW_BASE_THROTTLING = False
+
+ def __init__(self, tqm):
+ super(StrategyModule, self).__init__(tqm)
+ self._host_pinned = False
+
+ def run(self, iterator, play_context):
+ '''
+ The "free" strategy is a bit more complex, in that it allows tasks to
+ be sent to hosts as quickly as they can be processed. This means that
+ some hosts may finish very quickly if run tasks result in little or no
+ work being done versus other systems.
+
+ The algorithm used here also tries to be more "fair" when iterating
+ through hosts by remembering the last host in the list to be given a task
+ and starting the search from there as opposed to the top of the hosts
+ list again, which would end up favoring hosts near the beginning of the
+ list.
+ '''
+
+ # the last host to be given a task
+ last_host = 0
+
+ result = self._tqm.RUN_OK
+
+ # start with all workers being counted as being free
+ workers_free = len(self._workers)
+
+ self._set_hosts_cache(iterator._play)
+
+ if iterator._play.max_fail_percentage is not None:
+ display.warning("Using max_fail_percentage with the free strategy is not supported, as tasks are executed independently on each host")
+
+ work_to_do = True
+ while work_to_do and not self._tqm._terminated:
+
+ hosts_left = self.get_hosts_left(iterator)
+
+ if len(hosts_left) == 0:
+ self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
+ result = False
+ break
+
+ work_to_do = False # assume we have no more work to do
+ starting_host = last_host # save current position so we know when we've looped back around and need to break
+
+ # try and find an unblocked host with a task to run
+ host_results = []
+ while True:
+ host = hosts_left[last_host]
+ display.debug("next free host: %s" % host)
+ host_name = host.get_name()
+
+ # peek at the next task for the host, to see if there's
+ # anything to do do for this host
+ (state, task) = iterator.get_next_task_for_host(host, peek=True)
+ display.debug("free host state: %s" % state, host=host_name)
+ display.debug("free host task: %s" % task, host=host_name)
+
+ # check if there is work to do, either there is a task or the host is still blocked which could
+ # mean that it is processing an include task and after its result is processed there might be
+ # more tasks to run
+ if (task or self._blocked_hosts.get(host_name, False)) and not self._tqm._unreachable_hosts.get(host_name, False):
+ display.debug("this host has work to do", host=host_name)
+ # set the flag so the outer loop knows we've still found
+ # some work which needs to be done
+ work_to_do = True
+
+ if not self._tqm._unreachable_hosts.get(host_name, False) and task:
+ # check to see if this host is blocked (still executing a previous task)
+ if not self._blocked_hosts.get(host_name, False):
+ display.debug("getting variables", host=host_name)
+ task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
+ _hosts=self._hosts_cache,
+ _hosts_all=self._hosts_cache_all)
+ self.add_tqm_variables(task_vars, play=iterator._play)
+ templar = Templar(loader=self._loader, variables=task_vars)
+ display.debug("done getting variables", host=host_name)
+
+ try:
+ throttle = int(templar.template(task.throttle))
+ except Exception as e:
+ raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
+
+ if throttle > 0:
+ same_tasks = 0
+ for worker in self._workers:
+ if worker and worker.is_alive() and worker._task._uuid == task._uuid:
+ same_tasks += 1
+
+ display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks))
+ if same_tasks >= throttle:
+ break
+
+ # advance the host, mark the host blocked, and queue it
+ self._blocked_hosts[host_name] = True
+ iterator.set_state_for_host(host.name, state)
+
+ try:
+ action = action_loader.get(task.action, class_only=True, collection_list=task.collections)
+ except KeyError:
+ # we don't care here, because the action may simply not have a
+ # corresponding action plugin
+ action = None
+
+ try:
+ task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
+ display.debug("done templating", host=host_name)
+ except Exception:
+ # just ignore any errors during task name templating,
+ # we don't care if it just shows the raw name
+ display.debug("templating failed for some reason", host=host_name)
+
+ run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
+ if run_once:
+ if action and getattr(action, 'BYPASS_HOST_LOOP', False):
+ raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy "
+ "and would instead execute for every host in the inventory list." % task.action, obj=task._ds)
+ else:
+ display.warning("Using run_once with the free strategy is not currently supported. This task will still be "
+ "executed for every host in the inventory list.")
+
+ # check to see if this task should be skipped, due to it being a member of a
+ # role which has already run (and whether that role allows duplicate execution)
+ if not isinstance(task, Handler) and task._role and task._role.has_run(host):
+ # If there is no metadata, the default behavior is to not allow duplicates,
+ # if there is metadata, check to see if the allow_duplicates flag was set to true
+ if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
+ display.debug("'%s' skipped because role has already run" % task, host=host_name)
+ del self._blocked_hosts[host_name]
+ continue
+
+ if task.action in C._ACTION_META:
+ self._execute_meta(task, play_context, iterator, target_host=host)
+ self._blocked_hosts[host_name] = False
+ else:
+ # handle step if needed, skip meta actions as they are used internally
+ if not self._step or self._take_step(task, host_name):
+ if task.any_errors_fatal:
+ display.warning("Using any_errors_fatal with the free strategy is not supported, "
+ "as tasks are executed independently on each host")
+ if isinstance(task, Handler):
+ self._tqm.send_callback('v2_playbook_on_handler_task_start', task)
+ else:
+ self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
+ self._queue_task(host, task, task_vars, play_context)
+ # each task is counted as a worker being busy
+ workers_free -= 1
+ del task_vars
+ else:
+ display.debug("%s is blocked, skipping for now" % host_name)
+
+ # all workers have tasks to do (and the current host isn't done with the play).
+ # loop back to starting host and break out
+ if self._host_pinned and workers_free == 0 and work_to_do:
+ last_host = starting_host
+ break
+
+ # move on to the next host and make sure we
+ # haven't gone past the end of our hosts list
+ last_host += 1
+ if last_host > len(hosts_left) - 1:
+ last_host = 0
+
+ # if we've looped around back to the start, break out
+ if last_host == starting_host:
+ break
+
+ results = self._process_pending_results(iterator)
+ host_results.extend(results)
+
+ # each result is counted as a worker being free again
+ workers_free += len(results)
+
+ self.update_active_connections(results)
+
+ included_files = IncludedFile.process_include_results(
+ host_results,
+ iterator=iterator,
+ loader=self._loader,
+ variable_manager=self._variable_manager
+ )
+
+ if len(included_files) > 0:
+ all_blocks = dict((host, []) for host in hosts_left)
+ failed_includes_hosts = set()
+ for included_file in included_files:
+ display.debug("collecting new blocks for %s" % included_file)
+ is_handler = False
+ try:
+ if included_file._is_role:
+ new_ir = self._copy_included_file(included_file)
+
+ new_blocks, handler_blocks = new_ir.get_block_list(
+ play=iterator._play,
+ variable_manager=self._variable_manager,
+ loader=self._loader,
+ )
+ else:
+ is_handler = isinstance(included_file._task, Handler)
+ new_blocks = self._load_included_file(included_file, iterator=iterator, is_handler=is_handler)
+
+ # let PlayIterator know about any new handlers included via include_role or
+ # import_role within include_role/include_taks
+ iterator.handlers = [h for b in iterator._play.handlers for h in b.block]
+ except AnsibleParserError:
+ raise
+ except AnsibleError as e:
+ if included_file._is_role:
+ # include_role does not have on_include callback so display the error
+ display.error(to_text(e), wrap_text=False)
+ for r in included_file._results:
+ r._result['failed'] = True
+ failed_includes_hosts.add(r._host)
+ continue
+
+ for new_block in new_blocks:
+ if is_handler:
+ for task in new_block.block:
+ task.notified_hosts = included_file._hosts[:]
+ final_block = new_block
+ else:
+ task_vars = self._variable_manager.get_vars(
+ play=iterator._play,
+ task=new_block.get_first_parent_include(),
+ _hosts=self._hosts_cache,
+ _hosts_all=self._hosts_cache_all,
+ )
+ final_block = new_block.filter_tagged_tasks(task_vars)
+ for host in hosts_left:
+ if host in included_file._hosts:
+ all_blocks[host].append(final_block)
+ display.debug("done collecting new blocks for %s" % included_file)
+
+ for host in failed_includes_hosts:
+ self._tqm._failed_hosts[host.name] = True
+ iterator.mark_host_failed(host)
+
+ display.debug("adding all collected blocks from %d included file(s) to iterator" % len(included_files))
+ for host in hosts_left:
+ iterator.add_tasks(host, all_blocks[host])
+ display.debug("done adding collected blocks to iterator")
+
+ # pause briefly so we don't spin lock
+ time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
+
+ # collect all the final results
+ results = self._wait_on_pending_results(iterator)
+
+ # run the base class run() method, which executes the cleanup function
+ # and runs any outstanding handlers which have been triggered
+ return super(StrategyModule, self).run(iterator, play_context, result)