diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/executor | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream/2.14.3.tar.xz ansible-core-upstream/2.14.3.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/executor')
25 files changed, 6559 insertions, 0 deletions
diff --git a/lib/ansible/executor/__init__.py b/lib/ansible/executor/__init__.py new file mode 100644 index 0000000..ae8ccff --- /dev/null +++ b/lib/ansible/executor/__init__.py @@ -0,0 +1,20 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py new file mode 100644 index 0000000..fd82744 --- /dev/null +++ b/lib/ansible/executor/action_write_locks.py @@ -0,0 +1,46 @@ +# (c) 2016 - Red Hat, Inc. <info@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import multiprocessing.synchronize + +from multiprocessing import Lock + +from ansible.module_utils.facts.system.pkg_mgr import PKG_MGRS + +if 'action_write_locks' not in globals(): + # Do not initialize this more than once because it seems to bash + # the existing one. multiprocessing must be reloading the module + # when it forks? + action_write_locks = dict() # type: dict[str | None, multiprocessing.synchronize.Lock] + + # Below is a Lock for use when we weren't expecting a named module. It gets used when an action + # plugin invokes a module whose name does not match with the action's name. Slightly less + # efficient as all processes with unexpected module names will wait on this lock + action_write_locks[None] = Lock() + + # These plugins are known to be called directly by action plugins with names differing from the + # action plugin name. We precreate them here as an optimization. + # If a list of service managers is created in the future we can do the same for them. + mods = set(p['name'] for p in PKG_MGRS) + + mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) + for mod_name in mods: + action_write_locks[mod_name] = Lock() diff --git a/lib/ansible/executor/discovery/__init__.py b/lib/ansible/executor/discovery/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/ansible/executor/discovery/__init__.py diff --git a/lib/ansible/executor/discovery/python_target.py b/lib/ansible/executor/discovery/python_target.py new file mode 100644 index 0000000..7137733 --- /dev/null +++ b/lib/ansible/executor/discovery/python_target.py @@ -0,0 +1,48 @@ +# Copyright: (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# FUTURE: this could be swapped out for our bundled version of distro to move more complete platform +# logic to the targets, so long as we maintain Py2.6 compat and don't need to do any kind of script assembly + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import platform +import io +import os + + +def read_utf8_file(path, encoding='utf-8'): + if not os.access(path, os.R_OK): + return None + with io.open(path, 'r', encoding=encoding) as fd: + content = fd.read() + + return content + + +def get_platform_info(): + result = dict(platform_dist_result=[]) + + if hasattr(platform, 'dist'): + result['platform_dist_result'] = platform.dist() + + osrelease_content = read_utf8_file('/etc/os-release') + # try to fall back to /usr/lib/os-release + if not osrelease_content: + osrelease_content = read_utf8_file('/usr/lib/os-release') + + result['osrelease_content'] = osrelease_content + + return result + + +def main(): + info = get_platform_info() + + print(json.dumps(info)) + + +if __name__ == '__main__': + main() diff --git a/lib/ansible/executor/interpreter_discovery.py b/lib/ansible/executor/interpreter_discovery.py new file mode 100644 index 0000000..bfd8504 --- /dev/null +++ b/lib/ansible/executor/interpreter_discovery.py @@ -0,0 +1,207 @@ +# Copyright: (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import bisect +import json +import pkgutil +import re + +from ansible import constants as C +from ansible.module_utils._text import to_native, to_text +from ansible.module_utils.distro import LinuxDistribution +from ansible.utils.display import Display +from ansible.utils.plugin_docs import get_versioned_doclink +from ansible.module_utils.compat.version import LooseVersion +from ansible.module_utils.facts.system.distribution import Distribution +from traceback import format_exc + +OS_FAMILY_LOWER = {k.lower(): v.lower() for k, v in Distribution.OS_FAMILY.items()} + +display = Display() +foundre = re.compile(r'(?s)PLATFORM[\r\n]+(.*)FOUND(.*)ENDFOUND') + + +class InterpreterDiscoveryRequiredError(Exception): + def __init__(self, message, interpreter_name, discovery_mode): + super(InterpreterDiscoveryRequiredError, self).__init__(message) + self.interpreter_name = interpreter_name + self.discovery_mode = discovery_mode + + def __str__(self): + return self.message + + def __repr__(self): + # TODO: proper repr impl + return self.message + + +def discover_interpreter(action, interpreter_name, discovery_mode, task_vars): + # interpreter discovery is a 2-step process with the target. First, we use a simple shell-agnostic bootstrap to + # get the system type from uname, and find any random Python that can get us the info we need. For supported + # target OS types, we'll dispatch a Python script that calls plaform.dist() (for older platforms, where available) + # and brings back /etc/os-release (if present). The proper Python path is looked up in a table of known + # distros/versions with included Pythons; if nothing is found, depending on the discovery mode, either the + # default fallback of /usr/bin/python is used (if we know it's there), or discovery fails. + + # FUTURE: add logical equivalence for "python3" in the case of py3-only modules? + if interpreter_name != 'python': + raise ValueError('Interpreter discovery not supported for {0}'.format(interpreter_name)) + + host = task_vars.get('inventory_hostname', 'unknown') + res = None + platform_type = 'unknown' + found_interpreters = [u'/usr/bin/python'] # fallback value + is_auto_legacy = discovery_mode.startswith('auto_legacy') + is_silent = discovery_mode.endswith('_silent') + + try: + platform_python_map = C.config.get_config_value('_INTERPRETER_PYTHON_DISTRO_MAP', variables=task_vars) + bootstrap_python_list = C.config.get_config_value('INTERPRETER_PYTHON_FALLBACK', variables=task_vars) + + display.vvv(msg=u"Attempting {0} interpreter discovery".format(interpreter_name), host=host) + + # not all command -v impls accept a list of commands, so we have to call it once per python + command_list = ["command -v '%s'" % py for py in bootstrap_python_list] + shell_bootstrap = "echo PLATFORM; uname; echo FOUND; {0}; echo ENDFOUND".format('; '.join(command_list)) + + # FUTURE: in most cases we probably don't want to use become, but maybe sometimes we do? + res = action._low_level_execute_command(shell_bootstrap, sudoable=False) + + raw_stdout = res.get('stdout', u'') + + match = foundre.match(raw_stdout) + + if not match: + display.debug(u'raw interpreter discovery output: {0}'.format(raw_stdout), host=host) + raise ValueError('unexpected output from Python interpreter discovery') + + platform_type = match.groups()[0].lower().strip() + + found_interpreters = [interp.strip() for interp in match.groups()[1].splitlines() if interp.startswith('/')] + + display.debug(u"found interpreters: {0}".format(found_interpreters), host=host) + + if not found_interpreters: + if not is_silent: + action._discovery_warnings.append(u'No python interpreters found for ' + u'host {0} (tried {1})'.format(host, bootstrap_python_list)) + # this is lame, but returning None or throwing an exception is uglier + return u'/usr/bin/python' + + if platform_type != 'linux': + raise NotImplementedError('unsupported platform for extended discovery: {0}'.format(to_native(platform_type))) + + platform_script = pkgutil.get_data('ansible.executor.discovery', 'python_target.py') + + # FUTURE: respect pipelining setting instead of just if the connection supports it? + if action._connection.has_pipelining: + res = action._low_level_execute_command(found_interpreters[0], sudoable=False, in_data=platform_script) + else: + # FUTURE: implement on-disk case (via script action or ?) + raise NotImplementedError('pipelining support required for extended interpreter discovery') + + platform_info = json.loads(res.get('stdout')) + + distro, version = _get_linux_distro(platform_info) + + if not distro or not version: + raise NotImplementedError('unable to get Linux distribution/version info') + + family = OS_FAMILY_LOWER.get(distro.lower().strip()) + + version_map = platform_python_map.get(distro.lower().strip()) or platform_python_map.get(family) + if not version_map: + raise NotImplementedError('unsupported Linux distribution: {0}'.format(distro)) + + platform_interpreter = to_text(_version_fuzzy_match(version, version_map), errors='surrogate_or_strict') + + # provide a transition period for hosts that were using /usr/bin/python previously (but shouldn't have been) + if is_auto_legacy: + if platform_interpreter != u'/usr/bin/python' and u'/usr/bin/python' in found_interpreters: + if not is_silent: + action._discovery_warnings.append( + u"Distribution {0} {1} on host {2} should use {3}, but is using " + u"/usr/bin/python for backward compatibility with prior Ansible releases. " + u"See {4} for more information" + .format(distro, version, host, platform_interpreter, + get_versioned_doclink('reference_appendices/interpreter_discovery.html'))) + return u'/usr/bin/python' + + if platform_interpreter not in found_interpreters: + if platform_interpreter not in bootstrap_python_list: + # sanity check to make sure we looked for it + if not is_silent: + action._discovery_warnings \ + .append(u"Platform interpreter {0} on host {1} is missing from bootstrap list" + .format(platform_interpreter, host)) + + if not is_silent: + action._discovery_warnings \ + .append(u"Distribution {0} {1} on host {2} should use {3}, but is using {4}, since the " + u"discovered platform python interpreter was not present. See {5} " + u"for more information." + .format(distro, version, host, platform_interpreter, found_interpreters[0], + get_versioned_doclink('reference_appendices/interpreter_discovery.html'))) + return found_interpreters[0] + + return platform_interpreter + except NotImplementedError as ex: + display.vvv(msg=u'Python interpreter discovery fallback ({0})'.format(to_text(ex)), host=host) + except Exception as ex: + if not is_silent: + display.warning(msg=u'Unhandled error in Python interpreter discovery for host {0}: {1}'.format(host, to_text(ex))) + display.debug(msg=u'Interpreter discovery traceback:\n{0}'.format(to_text(format_exc())), host=host) + if res and res.get('stderr'): + display.vvv(msg=u'Interpreter discovery remote stderr:\n{0}'.format(to_text(res.get('stderr'))), host=host) + + if not is_silent: + action._discovery_warnings \ + .append(u"Platform {0} on host {1} is using the discovered Python interpreter at {2}, but future installation of " + u"another Python interpreter could change the meaning of that path. See {3} " + u"for more information." + .format(platform_type, host, found_interpreters[0], + get_versioned_doclink('reference_appendices/interpreter_discovery.html'))) + return found_interpreters[0] + + +def _get_linux_distro(platform_info): + dist_result = platform_info.get('platform_dist_result', []) + + if len(dist_result) == 3 and any(dist_result): + return dist_result[0], dist_result[1] + + osrelease_content = platform_info.get('osrelease_content') + + if not osrelease_content: + return u'', u'' + + osr = LinuxDistribution._parse_os_release_content(osrelease_content) + + return osr.get('id', u''), osr.get('version_id', u'') + + +def _version_fuzzy_match(version, version_map): + # try exact match first + res = version_map.get(version) + if res: + return res + + sorted_looseversions = sorted([LooseVersion(v) for v in version_map.keys()]) + + find_looseversion = LooseVersion(version) + + # slot match; return nearest previous version we're newer than + kpos = bisect.bisect(sorted_looseversions, find_looseversion) + + if kpos == 0: + # older than everything in the list, return the oldest version + # TODO: warning-worthy? + return version_map.get(sorted_looseversions[0].vstring) + + # TODO: is "past the end of the list" warning-worthy too (at least if it's not a major version match)? + + # return the next-oldest entry that we're newer than... + return version_map.get(sorted_looseversions[kpos - 1].vstring) diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py new file mode 100644 index 0000000..4d06acb --- /dev/null +++ b/lib/ansible/executor/module_common.py @@ -0,0 +1,1428 @@ +# (c) 2013-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2015 Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import ast +import base64 +import datetime +import json +import os +import shlex +import zipfile +import re +import pkgutil + +from ast import AST, Import, ImportFrom +from io import BytesIO + +from ansible.release import __version__, __author__ +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.executor.interpreter_discovery import InterpreterDiscoveryRequiredError +from ansible.executor.powershell import module_manifest as ps_manifest +from ansible.module_utils.common.json import AnsibleJSONEncoder +from ansible.module_utils.common.text.converters import to_bytes, to_text, to_native +from ansible.plugins.loader import module_utils_loader +from ansible.utils.collection_loader._collection_finder import _get_collection_metadata, _nested_dict_get + +# Must import strategy and use write_locks from there +# If we import write_locks directly then we end up binding a +# variable to the object and then it never gets updated. +from ansible.executor import action_write_locks + +from ansible.utils.display import Display +from collections import namedtuple + +import importlib.util +import importlib.machinery + +display = Display() + +ModuleUtilsProcessEntry = namedtuple('ModuleUtilsProcessEntry', ['name_parts', 'is_ambiguous', 'has_redirected_child', 'is_optional']) + +REPLACER = b"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>" +REPLACER_VERSION = b"\"<<ANSIBLE_VERSION>>\"" +REPLACER_COMPLEX = b"\"<<INCLUDE_ANSIBLE_MODULE_COMPLEX_ARGS>>\"" +REPLACER_WINDOWS = b"# POWERSHELL_COMMON" +REPLACER_JSONARGS = b"<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>" +REPLACER_SELINUX = b"<<SELINUX_SPECIAL_FILESYSTEMS>>" + +# We could end up writing out parameters with unicode characters so we need to +# specify an encoding for the python source file +ENCODING_STRING = u'# -*- coding: utf-8 -*-' +b_ENCODING_STRING = b'# -*- coding: utf-8 -*-' + +# module_common is relative to module_utils, so fix the path +_MODULE_UTILS_PATH = os.path.join(os.path.dirname(__file__), '..', 'module_utils') + +# ****************************************************************************** + +ANSIBALLZ_TEMPLATE = u'''%(shebang)s +%(coding)s +_ANSIBALLZ_WRAPPER = True # For test-module.py script to tell this is a ANSIBALLZ_WRAPPER +# This code is part of Ansible, but is an independent component. +# The code in this particular templatable string, and this templatable string +# only, is BSD licensed. Modules which end up using this snippet, which is +# dynamically combined together by Ansible still belong to the author of the +# module, and they may assign their own license to the complete work. +# +# Copyright (c), James Cammarata, 2016 +# Copyright (c), Toshio Kuratomi, 2016 +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +def _ansiballz_main(): + import os + import os.path + + # Access to the working directory is required by Python when using pipelining, as well as for the coverage module. + # Some platforms, such as macOS, may not allow querying the working directory when using become to drop privileges. + try: + os.getcwd() + except OSError: + try: + os.chdir(os.path.expanduser('~')) + except OSError: + os.chdir('/') + +%(rlimit)s + + import sys + import __main__ + + # For some distros and python versions we pick up this script in the temporary + # directory. This leads to problems when the ansible module masks a python + # library that another import needs. We have not figured out what about the + # specific distros and python versions causes this to behave differently. + # + # Tested distros: + # Fedora23 with python3.4 Works + # Ubuntu15.10 with python2.7 Works + # Ubuntu15.10 with python3.4 Fails without this + # Ubuntu16.04.1 with python3.5 Fails without this + # To test on another platform: + # * use the copy module (since this shadows the stdlib copy module) + # * Turn off pipelining + # * Make sure that the destination file does not exist + # * ansible ubuntu16-test -m copy -a 'src=/etc/motd dest=/var/tmp/m' + # This will traceback in shutil. Looking at the complete traceback will show + # that shutil is importing copy which finds the ansible module instead of the + # stdlib module + scriptdir = None + try: + scriptdir = os.path.dirname(os.path.realpath(__main__.__file__)) + except (AttributeError, OSError): + # Some platforms don't set __file__ when reading from stdin + # OSX raises OSError if using abspath() in a directory we don't have + # permission to read (realpath calls abspath) + pass + + # Strip cwd from sys.path to avoid potential permissions issues + excludes = set(('', '.', scriptdir)) + sys.path = [p for p in sys.path if p not in excludes] + + import base64 + import runpy + import shutil + import tempfile + import zipfile + + if sys.version_info < (3,): + PY3 = False + else: + PY3 = True + + ZIPDATA = """%(zipdata)s""" + + # Note: temp_path isn't needed once we switch to zipimport + def invoke_module(modlib_path, temp_path, json_params): + # When installed via setuptools (including python setup.py install), + # ansible may be installed with an easy-install.pth file. That file + # may load the system-wide install of ansible rather than the one in + # the module. sitecustomize is the only way to override that setting. + z = zipfile.ZipFile(modlib_path, mode='a') + + # py3: modlib_path will be text, py2: it's bytes. Need bytes at the end + sitecustomize = u'import sys\\nsys.path.insert(0,"%%s")\\n' %% modlib_path + sitecustomize = sitecustomize.encode('utf-8') + # Use a ZipInfo to work around zipfile limitation on hosts with + # clocks set to a pre-1980 year (for instance, Raspberry Pi) + zinfo = zipfile.ZipInfo() + zinfo.filename = 'sitecustomize.py' + zinfo.date_time = ( %(year)i, %(month)i, %(day)i, %(hour)i, %(minute)i, %(second)i) + z.writestr(zinfo, sitecustomize) + z.close() + + # Put the zipped up module_utils we got from the controller first in the python path so that we + # can monkeypatch the right basic + sys.path.insert(0, modlib_path) + + # Monkeypatch the parameters into basic + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = json_params +%(coverage)s + # Run the module! By importing it as '__main__', it thinks it is executing as a script + runpy.run_module(mod_name='%(module_fqn)s', init_globals=dict(_module_fqn='%(module_fqn)s', _modlib_path=modlib_path), + run_name='__main__', alter_sys=True) + + # Ansible modules must exit themselves + print('{"msg": "New-style module did not handle its own exit", "failed": true}') + sys.exit(1) + + def debug(command, zipped_mod, json_params): + # The code here normally doesn't run. It's only used for debugging on the + # remote machine. + # + # The subcommands in this function make it easier to debug ansiballz + # modules. Here's the basic steps: + # + # Run ansible with the environment variable: ANSIBLE_KEEP_REMOTE_FILES=1 and -vvv + # to save the module file remotely:: + # $ ANSIBLE_KEEP_REMOTE_FILES=1 ansible host1 -m ping -a 'data=october' -vvv + # + # Part of the verbose output will tell you where on the remote machine the + # module was written to:: + # [...] + # <host1> SSH: EXEC ssh -C -q -o ControlMaster=auto -o ControlPersist=60s -o KbdInteractiveAuthentication=no -o + # PreferredAuthentications=gssapi-with-mic,gssapi-keyex,hostbased,publickey -o PasswordAuthentication=no -o ConnectTimeout=10 -o + # ControlPath=/home/badger/.ansible/cp/ansible-ssh-%%h-%%p-%%r -tt rhel7 '/bin/sh -c '"'"'LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 + # LC_MESSAGES=en_US.UTF-8 /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping'"'"'' + # [...] + # + # Login to the remote machine and run the module file via from the previous + # step with the explode subcommand to extract the module payload into + # source files:: + # $ ssh host1 + # $ /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping explode + # Module expanded into: + # /home/badger/.ansible/tmp/ansible-tmp-1461173408.08-279692652635227/ansible + # + # You can now edit the source files to instrument the code or experiment with + # different parameter values. When you're ready to run the code you've modified + # (instead of the code from the actual zipped module), use the execute subcommand like this:: + # $ /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping execute + + # Okay to use __file__ here because we're running from a kept file + basedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'debug_dir') + args_path = os.path.join(basedir, 'args') + + if command == 'explode': + # transform the ZIPDATA into an exploded directory of code and then + # print the path to the code. This is an easy way for people to look + # at the code on the remote machine for debugging it in that + # environment + z = zipfile.ZipFile(zipped_mod) + for filename in z.namelist(): + if filename.startswith('/'): + raise Exception('Something wrong with this module zip file: should not contain absolute paths') + + dest_filename = os.path.join(basedir, filename) + if dest_filename.endswith(os.path.sep) and not os.path.exists(dest_filename): + os.makedirs(dest_filename) + else: + directory = os.path.dirname(dest_filename) + if not os.path.exists(directory): + os.makedirs(directory) + f = open(dest_filename, 'wb') + f.write(z.read(filename)) + f.close() + + # write the args file + f = open(args_path, 'wb') + f.write(json_params) + f.close() + + print('Module expanded into:') + print('%%s' %% basedir) + exitcode = 0 + + elif command == 'execute': + # Execute the exploded code instead of executing the module from the + # embedded ZIPDATA. This allows people to easily run their modified + # code on the remote machine to see how changes will affect it. + + # Set pythonpath to the debug dir + sys.path.insert(0, basedir) + + # read in the args file which the user may have modified + with open(args_path, 'rb') as f: + json_params = f.read() + + # Monkeypatch the parameters into basic + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = json_params + + # Run the module! By importing it as '__main__', it thinks it is executing as a script + runpy.run_module(mod_name='%(module_fqn)s', init_globals=None, run_name='__main__', alter_sys=True) + + # Ansible modules must exit themselves + print('{"msg": "New-style module did not handle its own exit", "failed": true}') + sys.exit(1) + + else: + print('WARNING: Unknown debug command. Doing nothing.') + exitcode = 0 + + return exitcode + + # + # See comments in the debug() method for information on debugging + # + + ANSIBALLZ_PARAMS = %(params)s + if PY3: + ANSIBALLZ_PARAMS = ANSIBALLZ_PARAMS.encode('utf-8') + try: + # There's a race condition with the controller removing the + # remote_tmpdir and this module executing under async. So we cannot + # store this in remote_tmpdir (use system tempdir instead) + # Only need to use [ansible_module]_payload_ in the temp_path until we move to zipimport + # (this helps ansible-test produce coverage stats) + temp_path = tempfile.mkdtemp(prefix='ansible_%(ansible_module)s_payload_') + + zipped_mod = os.path.join(temp_path, 'ansible_%(ansible_module)s_payload.zip') + + with open(zipped_mod, 'wb') as modlib: + modlib.write(base64.b64decode(ZIPDATA)) + + if len(sys.argv) == 2: + exitcode = debug(sys.argv[1], zipped_mod, ANSIBALLZ_PARAMS) + else: + # Note: temp_path isn't needed once we switch to zipimport + invoke_module(zipped_mod, temp_path, ANSIBALLZ_PARAMS) + finally: + try: + shutil.rmtree(temp_path) + except (NameError, OSError): + # tempdir creation probably failed + pass + sys.exit(exitcode) + +if __name__ == '__main__': + _ansiballz_main() +''' + +ANSIBALLZ_COVERAGE_TEMPLATE = ''' + os.environ['COVERAGE_FILE'] = '%(coverage_output)s=python-%%s=coverage' %% '.'.join(str(v) for v in sys.version_info[:2]) + + import atexit + + try: + import coverage + except ImportError: + print('{"msg": "Could not import `coverage` module.", "failed": true}') + sys.exit(1) + + cov = coverage.Coverage(config_file='%(coverage_config)s') + + def atexit_coverage(): + cov.stop() + cov.save() + + atexit.register(atexit_coverage) + + cov.start() +''' + +ANSIBALLZ_COVERAGE_CHECK_TEMPLATE = ''' + try: + if PY3: + import importlib.util + if importlib.util.find_spec('coverage') is None: + raise ImportError + else: + import imp + imp.find_module('coverage') + except ImportError: + print('{"msg": "Could not find `coverage` module.", "failed": true}') + sys.exit(1) +''' + +ANSIBALLZ_RLIMIT_TEMPLATE = ''' + import resource + + existing_soft, existing_hard = resource.getrlimit(resource.RLIMIT_NOFILE) + + # adjust soft limit subject to existing hard limit + requested_soft = min(existing_hard, %(rlimit_nofile)d) + + if requested_soft != existing_soft: + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (requested_soft, existing_hard)) + except ValueError: + # some platforms (eg macOS) lie about their hard limit + pass +''' + + +def _strip_comments(source): + # Strip comments and blank lines from the wrapper + buf = [] + for line in source.splitlines(): + l = line.strip() + if not l or l.startswith(u'#'): + continue + buf.append(line) + return u'\n'.join(buf) + + +if C.DEFAULT_KEEP_REMOTE_FILES: + # Keep comments when KEEP_REMOTE_FILES is set. That way users will see + # the comments with some nice usage instructions + ACTIVE_ANSIBALLZ_TEMPLATE = ANSIBALLZ_TEMPLATE +else: + # ANSIBALLZ_TEMPLATE stripped of comments for smaller over the wire size + ACTIVE_ANSIBALLZ_TEMPLATE = _strip_comments(ANSIBALLZ_TEMPLATE) + +# dirname(dirname(dirname(site-packages/ansible/executor/module_common.py) == site-packages +# Do this instead of getting site-packages from distutils.sysconfig so we work when we +# haven't been installed +site_packages = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +CORE_LIBRARY_PATH_RE = re.compile(r'%s/(?P<path>ansible/modules/.*)\.(py|ps1)$' % re.escape(site_packages)) +COLLECTION_PATH_RE = re.compile(r'/(?P<path>ansible_collections/[^/]+/[^/]+/plugins/modules/.*)\.(py|ps1)$') + +# Detect new-style Python modules by looking for required imports: +# import ansible_collections.[my_ns.my_col.plugins.module_utils.my_module_util] +# from ansible_collections.[my_ns.my_col.plugins.module_utils import my_module_util] +# import ansible.module_utils[.basic] +# from ansible.module_utils[ import basic] +# from ansible.module_utils[.basic import AnsibleModule] +# from ..module_utils[ import basic] +# from ..module_utils[.basic import AnsibleModule] +NEW_STYLE_PYTHON_MODULE_RE = re.compile( + # Relative imports + br'(?:from +\.{2,} *module_utils.* +import |' + # Collection absolute imports: + br'from +ansible_collections\.[^.]+\.[^.]+\.plugins\.module_utils.* +import |' + br'import +ansible_collections\.[^.]+\.[^.]+\.plugins\.module_utils.*|' + # Core absolute imports + br'from +ansible\.module_utils.* +import |' + br'import +ansible\.module_utils\.)' +) + + +class ModuleDepFinder(ast.NodeVisitor): + def __init__(self, module_fqn, tree, is_pkg_init=False, *args, **kwargs): + """ + Walk the ast tree for the python module. + :arg module_fqn: The fully qualified name to reach this module in dotted notation. + example: ansible.module_utils.basic + :arg is_pkg_init: Inform the finder it's looking at a package init (eg __init__.py) to allow + relative import expansion to use the proper package level without having imported it locally first. + + Save submodule[.submoduleN][.identifier] into self.submodules + when they are from ansible.module_utils or ansible_collections packages + + self.submodules will end up with tuples like: + - ('ansible', 'module_utils', 'basic',) + - ('ansible', 'module_utils', 'urls', 'fetch_url') + - ('ansible', 'module_utils', 'database', 'postgres') + - ('ansible', 'module_utils', 'database', 'postgres', 'quote') + - ('ansible', 'module_utils', 'database', 'postgres', 'quote') + - ('ansible_collections', 'my_ns', 'my_col', 'plugins', 'module_utils', 'foo') + + It's up to calling code to determine whether the final element of the + tuple are module names or something else (function, class, or variable names) + .. seealso:: :python3:class:`ast.NodeVisitor` + """ + super(ModuleDepFinder, self).__init__(*args, **kwargs) + self._tree = tree # squirrel this away so we can compare node parents to it + self.submodules = set() + self.optional_imports = set() + self.module_fqn = module_fqn + self.is_pkg_init = is_pkg_init + + self._visit_map = { + Import: self.visit_Import, + ImportFrom: self.visit_ImportFrom, + } + + self.visit(tree) + + def generic_visit(self, node): + """Overridden ``generic_visit`` that makes some assumptions about our + use case, and improves performance by calling visitors directly instead + of calling ``visit`` to offload calling visitors. + """ + generic_visit = self.generic_visit + visit_map = self._visit_map + for field, value in ast.iter_fields(node): + if isinstance(value, list): + for item in value: + if isinstance(item, (Import, ImportFrom)): + item.parent = node + visit_map[item.__class__](item) + elif isinstance(item, AST): + generic_visit(item) + + visit = generic_visit + + def visit_Import(self, node): + """ + Handle import ansible.module_utils.MODLIB[.MODLIBn] [as asname] + + We save these as interesting submodules when the imported library is in ansible.module_utils + or ansible.collections + """ + for alias in node.names: + if (alias.name.startswith('ansible.module_utils.') or + alias.name.startswith('ansible_collections.')): + py_mod = tuple(alias.name.split('.')) + self.submodules.add(py_mod) + # if the import's parent is the root document, it's a required import, otherwise it's optional + if node.parent != self._tree: + self.optional_imports.add(py_mod) + self.generic_visit(node) + + def visit_ImportFrom(self, node): + """ + Handle from ansible.module_utils.MODLIB import [.MODLIBn] [as asname] + + Also has to handle relative imports + + We save these as interesting submodules when the imported library is in ansible.module_utils + or ansible.collections + """ + + # FIXME: These should all get skipped: + # from ansible.executor import module_common + # from ...executor import module_common + # from ... import executor (Currently it gives a non-helpful error) + if node.level > 0: + # if we're in a package init, we have to add one to the node level (and make it none if 0 to preserve the right slicing behavior) + level_slice_offset = -node.level + 1 or None if self.is_pkg_init else -node.level + if self.module_fqn: + parts = tuple(self.module_fqn.split('.')) + if node.module: + # relative import: from .module import x + node_module = '.'.join(parts[:level_slice_offset] + (node.module,)) + else: + # relative import: from . import x + node_module = '.'.join(parts[:level_slice_offset]) + else: + # fall back to an absolute import + node_module = node.module + else: + # absolute import: from module import x + node_module = node.module + + # Specialcase: six is a special case because of its + # import logic + py_mod = None + if node.names[0].name == '_six': + self.submodules.add(('_six',)) + elif node_module.startswith('ansible.module_utils'): + # from ansible.module_utils.MODULE1[.MODULEn] import IDENTIFIER [as asname] + # from ansible.module_utils.MODULE1[.MODULEn] import MODULEn+1 [as asname] + # from ansible.module_utils.MODULE1[.MODULEn] import MODULEn+1 [,IDENTIFIER] [as asname] + # from ansible.module_utils import MODULE1 [,MODULEn] [as asname] + py_mod = tuple(node_module.split('.')) + + elif node_module.startswith('ansible_collections.'): + if node_module.endswith('plugins.module_utils') or '.plugins.module_utils.' in node_module: + # from ansible_collections.ns.coll.plugins.module_utils import MODULE [as aname] [,MODULE2] [as aname] + # from ansible_collections.ns.coll.plugins.module_utils.MODULE import IDENTIFIER [as aname] + # FIXME: Unhandled cornercase (needs to be ignored): + # from ansible_collections.ns.coll.plugins.[!module_utils].[FOO].plugins.module_utils import IDENTIFIER + py_mod = tuple(node_module.split('.')) + else: + # Not from module_utils so ignore. for instance: + # from ansible_collections.ns.coll.plugins.lookup import IDENTIFIER + pass + + if py_mod: + for alias in node.names: + self.submodules.add(py_mod + (alias.name,)) + # if the import's parent is the root document, it's a required import, otherwise it's optional + if node.parent != self._tree: + self.optional_imports.add(py_mod + (alias.name,)) + + self.generic_visit(node) + + +def _slurp(path): + if not os.path.exists(path): + raise AnsibleError("imported module support code does not exist at %s" % os.path.abspath(path)) + with open(path, 'rb') as fd: + data = fd.read() + return data + + +def _get_shebang(interpreter, task_vars, templar, args=tuple(), remote_is_local=False): + """ + Handles the different ways ansible allows overriding the shebang target for a module. + """ + # FUTURE: add logical equivalence for python3 in the case of py3-only modules + + interpreter_name = os.path.basename(interpreter).strip() + + # name for interpreter var + interpreter_config = u'ansible_%s_interpreter' % interpreter_name + # key for config + interpreter_config_key = "INTERPRETER_%s" % interpreter_name.upper() + + interpreter_out = None + + # looking for python, rest rely on matching vars + if interpreter_name == 'python': + # skip detection for network os execution, use playbook supplied one if possible + if remote_is_local: + interpreter_out = task_vars['ansible_playbook_python'] + + # a config def exists for this interpreter type; consult config for the value + elif C.config.get_configuration_definition(interpreter_config_key): + + interpreter_from_config = C.config.get_config_value(interpreter_config_key, variables=task_vars) + interpreter_out = templar.template(interpreter_from_config.strip()) + + # handle interpreter discovery if requested or empty interpreter was provided + if not interpreter_out or interpreter_out in ['auto', 'auto_legacy', 'auto_silent', 'auto_legacy_silent']: + + discovered_interpreter_config = u'discovered_interpreter_%s' % interpreter_name + facts_from_task_vars = task_vars.get('ansible_facts', {}) + + if discovered_interpreter_config not in facts_from_task_vars: + # interpreter discovery is desired, but has not been run for this host + raise InterpreterDiscoveryRequiredError("interpreter discovery needed", interpreter_name=interpreter_name, discovery_mode=interpreter_out) + else: + interpreter_out = facts_from_task_vars[discovered_interpreter_config] + else: + raise InterpreterDiscoveryRequiredError("interpreter discovery required", interpreter_name=interpreter_name, discovery_mode='auto_legacy') + + elif interpreter_config in task_vars: + # for non python we consult vars for a possible direct override + interpreter_out = templar.template(task_vars.get(interpreter_config).strip()) + + if not interpreter_out: + # nothing matched(None) or in case someone configures empty string or empty intepreter + interpreter_out = interpreter + + # set shebang + shebang = u'#!{0}'.format(interpreter_out) + if args: + shebang = shebang + u' ' + u' '.join(args) + + return shebang, interpreter_out + + +class ModuleUtilLocatorBase: + def __init__(self, fq_name_parts, is_ambiguous=False, child_is_redirected=False, is_optional=False): + self._is_ambiguous = is_ambiguous + # a child package redirection could cause intermediate package levels to be missing, eg + # from ansible.module_utils.x.y.z import foo; if x.y.z.foo is redirected, we may not have packages on disk for + # the intermediate packages x.y.z, so we'll need to supply empty packages for those + self._child_is_redirected = child_is_redirected + self._is_optional = is_optional + self.found = False + self.redirected = False + self.fq_name_parts = fq_name_parts + self.source_code = '' + self.output_path = '' + self.is_package = False + self._collection_name = None + # for ambiguous imports, we should only test for things more than one level below module_utils + # this lets us detect erroneous imports and redirections earlier + if is_ambiguous and len(self._get_module_utils_remainder_parts(fq_name_parts)) > 1: + self.candidate_names = [fq_name_parts, fq_name_parts[:-1]] + else: + self.candidate_names = [fq_name_parts] + + @property + def candidate_names_joined(self): + return ['.'.join(n) for n in self.candidate_names] + + def _handle_redirect(self, name_parts): + module_utils_relative_parts = self._get_module_utils_remainder_parts(name_parts) + + # only allow redirects from below module_utils- if above that, bail out (eg, parent package names) + if not module_utils_relative_parts: + return False + + try: + collection_metadata = _get_collection_metadata(self._collection_name) + except ValueError as ve: # collection not found or some other error related to collection load + if self._is_optional: + return False + raise AnsibleError('error processing module_util {0} loading redirected collection {1}: {2}' + .format('.'.join(name_parts), self._collection_name, to_native(ve))) + + routing_entry = _nested_dict_get(collection_metadata, ['plugin_routing', 'module_utils', '.'.join(module_utils_relative_parts)]) + if not routing_entry: + return False + # FIXME: add deprecation warning support + + dep_or_ts = routing_entry.get('tombstone') + removed = dep_or_ts is not None + if not removed: + dep_or_ts = routing_entry.get('deprecation') + + if dep_or_ts: + removal_date = dep_or_ts.get('removal_date') + removal_version = dep_or_ts.get('removal_version') + warning_text = dep_or_ts.get('warning_text') + + msg = 'module_util {0} has been removed'.format('.'.join(name_parts)) + if warning_text: + msg += ' ({0})'.format(warning_text) + else: + msg += '.' + + display.deprecated(msg, removal_version, removed, removal_date, self._collection_name) + if 'redirect' in routing_entry: + self.redirected = True + source_pkg = '.'.join(name_parts) + self.is_package = True # treat all redirects as packages + redirect_target_pkg = routing_entry['redirect'] + + # expand FQCN redirects + if not redirect_target_pkg.startswith('ansible_collections'): + split_fqcn = redirect_target_pkg.split('.') + if len(split_fqcn) < 3: + raise Exception('invalid redirect for {0}: {1}'.format(source_pkg, redirect_target_pkg)) + # assume it's an FQCN, expand it + redirect_target_pkg = 'ansible_collections.{0}.{1}.plugins.module_utils.{2}'.format( + split_fqcn[0], # ns + split_fqcn[1], # coll + '.'.join(split_fqcn[2:]) # sub-module_utils remainder + ) + display.vvv('redirecting module_util {0} to {1}'.format(source_pkg, redirect_target_pkg)) + self.source_code = self._generate_redirect_shim_source(source_pkg, redirect_target_pkg) + return True + return False + + def _get_module_utils_remainder_parts(self, name_parts): + # subclasses should override to return the name parts after module_utils + return [] + + def _get_module_utils_remainder(self, name_parts): + # return the remainder parts as a package string + return '.'.join(self._get_module_utils_remainder_parts(name_parts)) + + def _find_module(self, name_parts): + return False + + def _locate(self, redirect_first=True): + for candidate_name_parts in self.candidate_names: + if redirect_first and self._handle_redirect(candidate_name_parts): + break + + if self._find_module(candidate_name_parts): + break + + if not redirect_first and self._handle_redirect(candidate_name_parts): + break + + else: # didn't find what we were looking for- last chance for packages whose parents were redirected + if self._child_is_redirected: # make fake packages + self.is_package = True + self.source_code = '' + else: # nope, just bail + return + + if self.is_package: + path_parts = candidate_name_parts + ('__init__',) + else: + path_parts = candidate_name_parts + self.found = True + self.output_path = os.path.join(*path_parts) + '.py' + self.fq_name_parts = candidate_name_parts + + def _generate_redirect_shim_source(self, fq_source_module, fq_target_module): + return """ +import sys +import {1} as mod + +sys.modules['{0}'] = mod +""".format(fq_source_module, fq_target_module) + + # FIXME: add __repr__ impl + + +class LegacyModuleUtilLocator(ModuleUtilLocatorBase): + def __init__(self, fq_name_parts, is_ambiguous=False, mu_paths=None, child_is_redirected=False): + super(LegacyModuleUtilLocator, self).__init__(fq_name_parts, is_ambiguous, child_is_redirected) + + if fq_name_parts[0:2] != ('ansible', 'module_utils'): + raise Exception('this class can only locate from ansible.module_utils, got {0}'.format(fq_name_parts)) + + if fq_name_parts[2] == 'six': + # FIXME: handle the ansible.module_utils.six._six case with a redirect or an internal _six attr on six itself? + # six creates its submodules at runtime; convert all these to just 'ansible.module_utils.six' + fq_name_parts = ('ansible', 'module_utils', 'six') + self.candidate_names = [fq_name_parts] + + self._mu_paths = mu_paths + self._collection_name = 'ansible.builtin' # legacy module utils always look in ansible.builtin for redirects + self._locate(redirect_first=False) # let local stuff override redirects for legacy + + def _get_module_utils_remainder_parts(self, name_parts): + return name_parts[2:] # eg, foo.bar for ansible.module_utils.foo.bar + + def _find_module(self, name_parts): + rel_name_parts = self._get_module_utils_remainder_parts(name_parts) + + # no redirection; try to find the module + if len(rel_name_parts) == 1: # direct child of module_utils, just search the top-level dirs we were given + paths = self._mu_paths + else: # a nested submodule of module_utils, extend the paths given with the intermediate package names + paths = [os.path.join(p, *rel_name_parts[:-1]) for p in + self._mu_paths] # extend the MU paths with the relative bit + + # find_spec needs the full module name + self._info = info = importlib.machinery.PathFinder.find_spec('.'.join(name_parts), paths) + if info is not None and os.path.splitext(info.origin)[1] in importlib.machinery.SOURCE_SUFFIXES: + self.is_package = info.origin.endswith('/__init__.py') + path = info.origin + else: + return False + self.source_code = _slurp(path) + + return True + + +class CollectionModuleUtilLocator(ModuleUtilLocatorBase): + def __init__(self, fq_name_parts, is_ambiguous=False, child_is_redirected=False, is_optional=False): + super(CollectionModuleUtilLocator, self).__init__(fq_name_parts, is_ambiguous, child_is_redirected, is_optional) + + if fq_name_parts[0] != 'ansible_collections': + raise Exception('CollectionModuleUtilLocator can only locate from ansible_collections, got {0}'.format(fq_name_parts)) + elif len(fq_name_parts) >= 6 and fq_name_parts[3:5] != ('plugins', 'module_utils'): + raise Exception('CollectionModuleUtilLocator can only locate below ansible_collections.(ns).(coll).plugins.module_utils, got {0}' + .format(fq_name_parts)) + + self._collection_name = '.'.join(fq_name_parts[1:3]) + + self._locate() + + def _find_module(self, name_parts): + # synthesize empty inits for packages down through module_utils- we don't want to allow those to be shipped over, but the + # package hierarchy needs to exist + if len(name_parts) < 6: + self.source_code = '' + self.is_package = True + return True + + # NB: we can't use pkgutil.get_data safely here, since we don't want to import/execute package/module code on + # the controller while analyzing/assembling the module, so we'll have to manually import the collection's + # Python package to locate it (import root collection, reassemble resource path beneath, fetch source) + + collection_pkg_name = '.'.join(name_parts[0:3]) + resource_base_path = os.path.join(*name_parts[3:]) + + src = None + # look for package_dir first, then module + try: + src = pkgutil.get_data(collection_pkg_name, to_native(os.path.join(resource_base_path, '__init__.py'))) + except ImportError: + pass + + # TODO: we might want to synthesize fake inits for py3-style packages, for now they're required beneath module_utils + + if src is not None: # empty string is OK + self.is_package = True + else: + try: + src = pkgutil.get_data(collection_pkg_name, to_native(resource_base_path + '.py')) + except ImportError: + pass + + if src is None: # empty string is OK + return False + + self.source_code = src + return True + + def _get_module_utils_remainder_parts(self, name_parts): + return name_parts[5:] # eg, foo.bar for ansible_collections.ns.coll.plugins.module_utils.foo.bar + + +def recursive_finder(name, module_fqn, module_data, zf): + """ + Using ModuleDepFinder, make sure we have all of the module_utils files that + the module and its module_utils files needs. (no longer actually recursive) + :arg name: Name of the python module we're examining + :arg module_fqn: Fully qualified name of the python module we're scanning + :arg module_data: string Python code of the module we're scanning + :arg zf: An open :python:class:`zipfile.ZipFile` object that holds the Ansible module payload + which we're assembling + """ + + # py_module_cache maps python module names to a tuple of the code in the module + # and the pathname to the module. + # Here we pre-load it with modules which we create without bothering to + # read from actual files (In some cases, these need to differ from what ansible + # ships because they're namespace packages in the module) + # FIXME: do we actually want ns pkg behavior for these? Seems like they should just be forced to emptyish pkg stubs + py_module_cache = { + ('ansible',): ( + b'from pkgutil import extend_path\n' + b'__path__=extend_path(__path__,__name__)\n' + b'__version__="' + to_bytes(__version__) + + b'"\n__author__="' + to_bytes(__author__) + b'"\n', + 'ansible/__init__.py'), + ('ansible', 'module_utils'): ( + b'from pkgutil import extend_path\n' + b'__path__=extend_path(__path__,__name__)\n', + 'ansible/module_utils/__init__.py')} + + module_utils_paths = [p for p in module_utils_loader._get_paths(subdirs=False) if os.path.isdir(p)] + module_utils_paths.append(_MODULE_UTILS_PATH) + + # Parse the module code and find the imports of ansible.module_utils + try: + tree = compile(module_data, '<unknown>', 'exec', ast.PyCF_ONLY_AST) + except (SyntaxError, IndentationError) as e: + raise AnsibleError("Unable to import %s due to %s" % (name, e.msg)) + + finder = ModuleDepFinder(module_fqn, tree) + + # the format of this set is a tuple of the module name and whether or not the import is ambiguous as a module name + # or an attribute of a module (eg from x.y import z <-- is z a module or an attribute of x.y?) + modules_to_process = [ModuleUtilsProcessEntry(m, True, False, is_optional=m in finder.optional_imports) for m in finder.submodules] + + # HACK: basic is currently always required since module global init is currently tied up with AnsiballZ arg input + modules_to_process.append(ModuleUtilsProcessEntry(('ansible', 'module_utils', 'basic'), False, False, is_optional=False)) + + # we'll be adding new modules inline as we discover them, so just keep going til we've processed them all + while modules_to_process: + modules_to_process.sort() # not strictly necessary, but nice to process things in predictable and repeatable order + py_module_name, is_ambiguous, child_is_redirected, is_optional = modules_to_process.pop(0) + + if py_module_name in py_module_cache: + # this is normal; we'll often see the same module imported many times, but we only need to process it once + continue + + if py_module_name[0:2] == ('ansible', 'module_utils'): + module_info = LegacyModuleUtilLocator(py_module_name, is_ambiguous=is_ambiguous, + mu_paths=module_utils_paths, child_is_redirected=child_is_redirected) + elif py_module_name[0] == 'ansible_collections': + module_info = CollectionModuleUtilLocator(py_module_name, is_ambiguous=is_ambiguous, + child_is_redirected=child_is_redirected, is_optional=is_optional) + else: + # FIXME: dot-joined result + display.warning('ModuleDepFinder improperly found a non-module_utils import %s' + % [py_module_name]) + continue + + # Could not find the module. Construct a helpful error message. + if not module_info.found: + if is_optional: + # this was a best-effort optional import that we couldn't find, oh well, move along... + continue + # FIXME: use dot-joined candidate names + msg = 'Could not find imported module support code for {0}. Looked for ({1})'.format(module_fqn, module_info.candidate_names_joined) + raise AnsibleError(msg) + + # check the cache one more time with the module we actually found, since the name could be different than the input + # eg, imported name vs module + if module_info.fq_name_parts in py_module_cache: + continue + + # compile the source, process all relevant imported modules + try: + tree = compile(module_info.source_code, '<unknown>', 'exec', ast.PyCF_ONLY_AST) + except (SyntaxError, IndentationError) as e: + raise AnsibleError("Unable to import %s due to %s" % (module_info.fq_name_parts, e.msg)) + + finder = ModuleDepFinder('.'.join(module_info.fq_name_parts), tree, module_info.is_package) + modules_to_process.extend(ModuleUtilsProcessEntry(m, True, False, is_optional=m in finder.optional_imports) + for m in finder.submodules if m not in py_module_cache) + + # we've processed this item, add it to the output list + py_module_cache[module_info.fq_name_parts] = (module_info.source_code, module_info.output_path) + + # ensure we process all ancestor package inits + accumulated_pkg_name = [] + for pkg in module_info.fq_name_parts[:-1]: + accumulated_pkg_name.append(pkg) # we're accumulating this across iterations + normalized_name = tuple(accumulated_pkg_name) # extra machinations to get a hashable type (list is not) + if normalized_name not in py_module_cache: + modules_to_process.append(ModuleUtilsProcessEntry(normalized_name, False, module_info.redirected, is_optional=is_optional)) + + for py_module_name in py_module_cache: + py_module_file_name = py_module_cache[py_module_name][1] + + zf.writestr(py_module_file_name, py_module_cache[py_module_name][0]) + mu_file = to_text(py_module_file_name, errors='surrogate_or_strict') + display.vvvvv("Including module_utils file %s" % mu_file) + + +def _is_binary(b_module_data): + textchars = bytearray(set([7, 8, 9, 10, 12, 13, 27]) | set(range(0x20, 0x100)) - set([0x7f])) + start = b_module_data[:1024] + return bool(start.translate(None, textchars)) + + +def _get_ansible_module_fqn(module_path): + """ + Get the fully qualified name for an ansible module based on its pathname + + remote_module_fqn is the fully qualified name. Like ansible.modules.system.ping + Or ansible_collections.Namespace.Collection_name.plugins.modules.ping + .. warning:: This function is for ansible modules only. It won't work for other things + (non-module plugins, etc) + """ + remote_module_fqn = None + + # Is this a core module? + match = CORE_LIBRARY_PATH_RE.search(module_path) + if not match: + # Is this a module in a collection? + match = COLLECTION_PATH_RE.search(module_path) + + # We can tell the FQN for core modules and collection modules + if match: + path = match.group('path') + if '.' in path: + # FQNs must be valid as python identifiers. This sanity check has failed. + # we could check other things as well + raise ValueError('Module name (or path) was not a valid python identifier') + + remote_module_fqn = '.'.join(path.split('/')) + else: + # Currently we do not handle modules in roles so we can end up here for that reason + raise ValueError("Unable to determine module's fully qualified name") + + return remote_module_fqn + + +def _add_module_to_zip(zf, remote_module_fqn, b_module_data): + """Add a module from ansible or from an ansible collection into the module zip""" + module_path_parts = remote_module_fqn.split('.') + + # Write the module + module_path = '/'.join(module_path_parts) + '.py' + zf.writestr(module_path, b_module_data) + + # Write the __init__.py's necessary to get there + if module_path_parts[0] == 'ansible': + # The ansible namespace is setup as part of the module_utils setup... + start = 2 + existing_paths = frozenset() + else: + # ... but ansible_collections and other toplevels are not + start = 1 + existing_paths = frozenset(zf.namelist()) + + for idx in range(start, len(module_path_parts)): + package_path = '/'.join(module_path_parts[:idx]) + '/__init__.py' + # If a collections module uses module_utils from a collection then most packages will have already been added by recursive_finder. + if package_path in existing_paths: + continue + # Note: We don't want to include more than one ansible module in a payload at this time + # so no need to fill the __init__.py with namespace code + zf.writestr(package_path, b'') + + +def _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression, async_timeout, become, + become_method, become_user, become_password, become_flags, environment, remote_is_local=False): + """ + Given the source of the module, convert it to a Jinja2 template to insert + module code and return whether it's a new or old style module. + """ + module_substyle = module_style = 'old' + + # module_style is something important to calling code (ActionBase). It + # determines how arguments are formatted (json vs k=v) and whether + # a separate arguments file needs to be sent over the wire. + # module_substyle is extra information that's useful internally. It tells + # us what we have to look to substitute in the module files and whether + # we're using module replacer or ansiballz to format the module itself. + if _is_binary(b_module_data): + module_substyle = module_style = 'binary' + elif REPLACER in b_module_data: + # Do REPLACER before from ansible.module_utils because we need make sure + # we substitute "from ansible.module_utils basic" for REPLACER + module_style = 'new' + module_substyle = 'python' + b_module_data = b_module_data.replace(REPLACER, b'from ansible.module_utils.basic import *') + elif NEW_STYLE_PYTHON_MODULE_RE.search(b_module_data): + module_style = 'new' + module_substyle = 'python' + elif REPLACER_WINDOWS in b_module_data: + module_style = 'new' + module_substyle = 'powershell' + b_module_data = b_module_data.replace(REPLACER_WINDOWS, b'#Requires -Module Ansible.ModuleUtils.Legacy') + elif re.search(b'#Requires -Module', b_module_data, re.IGNORECASE) \ + or re.search(b'#Requires -Version', b_module_data, re.IGNORECASE)\ + or re.search(b'#AnsibleRequires -OSVersion', b_module_data, re.IGNORECASE) \ + or re.search(b'#AnsibleRequires -Powershell', b_module_data, re.IGNORECASE) \ + or re.search(b'#AnsibleRequires -CSharpUtil', b_module_data, re.IGNORECASE): + module_style = 'new' + module_substyle = 'powershell' + elif REPLACER_JSONARGS in b_module_data: + module_style = 'new' + module_substyle = 'jsonargs' + elif b'WANT_JSON' in b_module_data: + module_substyle = module_style = 'non_native_want_json' + + shebang = None + # Neither old-style, non_native_want_json nor binary modules should be modified + # except for the shebang line (Done by modify_module) + if module_style in ('old', 'non_native_want_json', 'binary'): + return b_module_data, module_style, shebang + + output = BytesIO() + + try: + remote_module_fqn = _get_ansible_module_fqn(module_path) + except ValueError: + # Modules in roles currently are not found by the fqn heuristic so we + # fallback to this. This means that relative imports inside a module from + # a role may fail. Absolute imports should be used for future-proofness. + # People should start writing collections instead of modules in roles so we + # may never fix this + display.debug('ANSIBALLZ: Could not determine module FQN') + remote_module_fqn = 'ansible.modules.%s' % module_name + + if module_substyle == 'python': + params = dict(ANSIBLE_MODULE_ARGS=module_args,) + try: + python_repred_params = repr(json.dumps(params, cls=AnsibleJSONEncoder, vault_to_text=True)) + except TypeError as e: + raise AnsibleError("Unable to pass options to module, they must be JSON serializable: %s" % to_native(e)) + + try: + compression_method = getattr(zipfile, module_compression) + except AttributeError: + display.warning(u'Bad module compression string specified: %s. Using ZIP_STORED (no compression)' % module_compression) + compression_method = zipfile.ZIP_STORED + + lookup_path = os.path.join(C.DEFAULT_LOCAL_TMP, 'ansiballz_cache') + cached_module_filename = os.path.join(lookup_path, "%s-%s" % (remote_module_fqn, module_compression)) + + zipdata = None + # Optimization -- don't lock if the module has already been cached + if os.path.exists(cached_module_filename): + display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) + with open(cached_module_filename, 'rb') as module_data: + zipdata = module_data.read() + else: + if module_name in action_write_locks.action_write_locks: + display.debug('ANSIBALLZ: Using lock for %s' % module_name) + lock = action_write_locks.action_write_locks[module_name] + else: + # If the action plugin directly invokes the module (instead of + # going through a strategy) then we don't have a cross-process + # Lock specifically for this module. Use the "unexpected + # module" lock instead + display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) + lock = action_write_locks.action_write_locks[None] + + display.debug('ANSIBALLZ: Acquiring lock') + with lock: + display.debug('ANSIBALLZ: Lock acquired: %s' % id(lock)) + # Check that no other process has created this while we were + # waiting for the lock + if not os.path.exists(cached_module_filename): + display.debug('ANSIBALLZ: Creating module') + # Create the module zip data + zipoutput = BytesIO() + zf = zipfile.ZipFile(zipoutput, mode='w', compression=compression_method) + + # walk the module imports, looking for module_utils to send- they'll be added to the zipfile + recursive_finder(module_name, remote_module_fqn, b_module_data, zf) + + display.debug('ANSIBALLZ: Writing module into payload') + _add_module_to_zip(zf, remote_module_fqn, b_module_data) + + zf.close() + zipdata = base64.b64encode(zipoutput.getvalue()) + + # Write the assembled module to a temp file (write to temp + # so that no one looking for the file reads a partially + # written file) + # + # FIXME: Once split controller/remote is merged, this can be simplified to + # os.makedirs(lookup_path, exist_ok=True) + if not os.path.exists(lookup_path): + try: + # Note -- if we have a global function to setup, that would + # be a better place to run this + os.makedirs(lookup_path) + except OSError: + # Multiple processes tried to create the directory. If it still does not + # exist, raise the original exception. + if not os.path.exists(lookup_path): + raise + display.debug('ANSIBALLZ: Writing module') + with open(cached_module_filename + '-part', 'wb') as f: + f.write(zipdata) + + # Rename the file into its final position in the cache so + # future users of this module can read it off the + # filesystem instead of constructing from scratch. + display.debug('ANSIBALLZ: Renaming module') + os.rename(cached_module_filename + '-part', cached_module_filename) + display.debug('ANSIBALLZ: Done creating module') + + if zipdata is None: + display.debug('ANSIBALLZ: Reading module after lock') + # Another process wrote the file while we were waiting for + # the write lock. Go ahead and read the data from disk + # instead of re-creating it. + try: + with open(cached_module_filename, 'rb') as f: + zipdata = f.read() + except IOError: + raise AnsibleError('A different worker process failed to create module file. ' + 'Look at traceback for that process for debugging information.') + zipdata = to_text(zipdata, errors='surrogate_or_strict') + + o_interpreter, o_args = _extract_interpreter(b_module_data) + if o_interpreter is None: + o_interpreter = u'/usr/bin/python' + + shebang, interpreter = _get_shebang(o_interpreter, task_vars, templar, o_args, remote_is_local=remote_is_local) + + # FUTURE: the module cache entry should be invalidated if we got this value from a host-dependent source + rlimit_nofile = C.config.get_config_value('PYTHON_MODULE_RLIMIT_NOFILE', variables=task_vars) + + if not isinstance(rlimit_nofile, int): + rlimit_nofile = int(templar.template(rlimit_nofile)) + + if rlimit_nofile: + rlimit = ANSIBALLZ_RLIMIT_TEMPLATE % dict( + rlimit_nofile=rlimit_nofile, + ) + else: + rlimit = '' + + coverage_config = os.environ.get('_ANSIBLE_COVERAGE_CONFIG') + + if coverage_config: + coverage_output = os.environ['_ANSIBLE_COVERAGE_OUTPUT'] + + if coverage_output: + # Enable code coverage analysis of the module. + # This feature is for internal testing and may change without notice. + coverage = ANSIBALLZ_COVERAGE_TEMPLATE % dict( + coverage_config=coverage_config, + coverage_output=coverage_output, + ) + else: + # Verify coverage is available without importing it. + # This will detect when a module would fail with coverage enabled with minimal overhead. + coverage = ANSIBALLZ_COVERAGE_CHECK_TEMPLATE + else: + coverage = '' + + now = datetime.datetime.utcnow() + output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict( + zipdata=zipdata, + ansible_module=module_name, + module_fqn=remote_module_fqn, + params=python_repred_params, + shebang=shebang, + coding=ENCODING_STRING, + year=now.year, + month=now.month, + day=now.day, + hour=now.hour, + minute=now.minute, + second=now.second, + coverage=coverage, + rlimit=rlimit, + ))) + b_module_data = output.getvalue() + + elif module_substyle == 'powershell': + # Powershell/winrm don't actually make use of shebang so we can + # safely set this here. If we let the fallback code handle this + # it can fail in the presence of the UTF8 BOM commonly added by + # Windows text editors + shebang = u'#!powershell' + # create the common exec wrapper payload and set that as the module_data + # bytes + b_module_data = ps_manifest._create_powershell_wrapper( + b_module_data, module_path, module_args, environment, + async_timeout, become, become_method, become_user, become_password, + become_flags, module_substyle, task_vars, remote_module_fqn + ) + + elif module_substyle == 'jsonargs': + module_args_json = to_bytes(json.dumps(module_args, cls=AnsibleJSONEncoder, vault_to_text=True)) + + # these strings could be included in a third-party module but + # officially they were included in the 'basic' snippet for new-style + # python modules (which has been replaced with something else in + # ansiballz) If we remove them from jsonargs-style module replacer + # then we can remove them everywhere. + python_repred_args = to_bytes(repr(module_args_json)) + b_module_data = b_module_data.replace(REPLACER_VERSION, to_bytes(repr(__version__))) + b_module_data = b_module_data.replace(REPLACER_COMPLEX, python_repred_args) + b_module_data = b_module_data.replace(REPLACER_SELINUX, to_bytes(','.join(C.DEFAULT_SELINUX_SPECIAL_FS))) + + # The main event -- substitute the JSON args string into the module + b_module_data = b_module_data.replace(REPLACER_JSONARGS, module_args_json) + + facility = b'syslog.' + to_bytes(task_vars.get('ansible_syslog_facility', C.DEFAULT_SYSLOG_FACILITY), errors='surrogate_or_strict') + b_module_data = b_module_data.replace(b'syslog.LOG_USER', facility) + + return (b_module_data, module_style, shebang) + + +def _extract_interpreter(b_module_data): + """ + Used to extract shebang expression from binary module data and return a text + string with the shebang, or None if no shebang is detected. + """ + + interpreter = None + args = [] + b_lines = b_module_data.split(b"\n", 1) + if b_lines[0].startswith(b"#!"): + b_shebang = b_lines[0].strip() + + # shlex.split needs text on Python 3 + cli_split = shlex.split(to_text(b_shebang[2:], errors='surrogate_or_strict')) + + # convert args to text + cli_split = [to_text(a, errors='surrogate_or_strict') for a in cli_split] + interpreter = cli_split[0] + args = cli_split[1:] + + return interpreter, args + + +def modify_module(module_name, module_path, module_args, templar, task_vars=None, module_compression='ZIP_STORED', async_timeout=0, become=False, + become_method=None, become_user=None, become_password=None, become_flags=None, environment=None, remote_is_local=False): + """ + Used to insert chunks of code into modules before transfer rather than + doing regular python imports. This allows for more efficient transfer in + a non-bootstrapping scenario by not moving extra files over the wire and + also takes care of embedding arguments in the transferred modules. + + This version is done in such a way that local imports can still be + used in the module code, so IDEs don't have to be aware of what is going on. + + Example: + + from ansible.module_utils.basic import * + + ... will result in the insertion of basic.py into the module + from the module_utils/ directory in the source tree. + + For powershell, this code effectively no-ops, as the exec wrapper requires access to a number of + properties not available here. + + """ + task_vars = {} if task_vars is None else task_vars + environment = {} if environment is None else environment + + with open(module_path, 'rb') as f: + + # read in the module source + b_module_data = f.read() + + (b_module_data, module_style, shebang) = _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression, + async_timeout=async_timeout, become=become, become_method=become_method, + become_user=become_user, become_password=become_password, become_flags=become_flags, + environment=environment, remote_is_local=remote_is_local) + + if module_style == 'binary': + return (b_module_data, module_style, to_text(shebang, nonstring='passthru')) + elif shebang is None: + interpreter, args = _extract_interpreter(b_module_data) + # No interpreter/shebang, assume a binary module? + if interpreter is not None: + + shebang, new_interpreter = _get_shebang(interpreter, task_vars, templar, args, remote_is_local=remote_is_local) + + # update shebang + b_lines = b_module_data.split(b"\n", 1) + + if interpreter != new_interpreter: + b_lines[0] = to_bytes(shebang, errors='surrogate_or_strict', nonstring='passthru') + + if os.path.basename(interpreter).startswith(u'python'): + b_lines.insert(1, b_ENCODING_STRING) + + b_module_data = b"\n".join(b_lines) + + return (b_module_data, module_style, shebang) + + +def get_action_args_with_defaults(action, args, defaults, templar, redirected_names=None, action_groups=None): + if redirected_names: + resolved_action_name = redirected_names[-1] + else: + resolved_action_name = action + + if redirected_names is not None: + msg = ( + "Finding module_defaults for the action %s. " + "The caller passed a list of redirected action names, which is deprecated. " + "The task's resolved action should be provided as the first argument instead." + ) + display.deprecated(msg % resolved_action_name, version='2.16') + + # Get the list of groups that contain this action + if action_groups is None: + msg = ( + "Finding module_defaults for action %s. " + "The caller has not passed the action_groups, so any " + "that may include this action will be ignored." + ) + display.warning(msg=msg) + group_names = [] + else: + group_names = action_groups.get(resolved_action_name, []) + + tmp_args = {} + module_defaults = {} + + # Merge latest defaults into dict, since they are a list of dicts + if isinstance(defaults, list): + for default in defaults: + module_defaults.update(default) + + # module_defaults keys are static, but the values may be templated + module_defaults = templar.template(module_defaults) + for default in module_defaults: + if default.startswith('group/'): + group_name = default.split('group/')[-1] + if group_name in group_names: + tmp_args.update((module_defaults.get('group/%s' % group_name) or {}).copy()) + + # handle specific action defaults + tmp_args.update(module_defaults.get(resolved_action_name, {}).copy()) + + # direct args override all + tmp_args.update(args) + + return tmp_args diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py new file mode 100644 index 0000000..2449782 --- /dev/null +++ b/lib/ansible/executor/play_iterator.py @@ -0,0 +1,652 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import fnmatch + +from enum import IntEnum, IntFlag + +from ansible import constants as C +from ansible.errors import AnsibleAssertionError +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.playbook.block import Block +from ansible.playbook.task import Task +from ansible.utils.display import Display + + +display = Display() + + +__all__ = ['PlayIterator', 'IteratingStates', 'FailedStates'] + + +class IteratingStates(IntEnum): + SETUP = 0 + TASKS = 1 + RESCUE = 2 + ALWAYS = 3 + HANDLERS = 4 + COMPLETE = 5 + + +class FailedStates(IntFlag): + NONE = 0 + SETUP = 1 + TASKS = 2 + RESCUE = 4 + ALWAYS = 8 + HANDLERS = 16 + + +class HostState: + def __init__(self, blocks): + self._blocks = blocks[:] + self.handlers = [] + + self.cur_block = 0 + self.cur_regular_task = 0 + self.cur_rescue_task = 0 + self.cur_always_task = 0 + self.cur_handlers_task = 0 + self.run_state = IteratingStates.SETUP + self.fail_state = FailedStates.NONE + self.pre_flushing_run_state = None + self.update_handlers = True + self.pending_setup = False + self.tasks_child_state = None + self.rescue_child_state = None + self.always_child_state = None + self.did_rescue = False + self.did_start_at_task = False + + def __repr__(self): + return "HostState(%r)" % self._blocks + + def __str__(self): + return ("HOST STATE: block=%d, task=%d, rescue=%d, always=%d, handlers=%d, run_state=%s, fail_state=%s, " + "pre_flushing_run_state=%s, update_handlers=%s, pending_setup=%s, " + "tasks child state? (%s), rescue child state? (%s), always child state? (%s), " + "did rescue? %s, did start at task? %s" % ( + self.cur_block, + self.cur_regular_task, + self.cur_rescue_task, + self.cur_always_task, + self.cur_handlers_task, + self.run_state, + self.fail_state, + self.pre_flushing_run_state, + self.update_handlers, + self.pending_setup, + self.tasks_child_state, + self.rescue_child_state, + self.always_child_state, + self.did_rescue, + self.did_start_at_task, + )) + + def __eq__(self, other): + if not isinstance(other, HostState): + return False + + for attr in ('_blocks', + 'cur_block', 'cur_regular_task', 'cur_rescue_task', 'cur_always_task', 'cur_handlers_task', + 'run_state', 'fail_state', 'pre_flushing_run_state', 'update_handlers', 'pending_setup', + 'tasks_child_state', 'rescue_child_state', 'always_child_state'): + if getattr(self, attr) != getattr(other, attr): + return False + + return True + + def get_current_block(self): + return self._blocks[self.cur_block] + + def copy(self): + new_state = HostState(self._blocks) + new_state.handlers = self.handlers[:] + new_state.cur_block = self.cur_block + new_state.cur_regular_task = self.cur_regular_task + new_state.cur_rescue_task = self.cur_rescue_task + new_state.cur_always_task = self.cur_always_task + new_state.cur_handlers_task = self.cur_handlers_task + new_state.run_state = self.run_state + new_state.fail_state = self.fail_state + new_state.pre_flushing_run_state = self.pre_flushing_run_state + new_state.update_handlers = self.update_handlers + new_state.pending_setup = self.pending_setup + new_state.did_rescue = self.did_rescue + new_state.did_start_at_task = self.did_start_at_task + if self.tasks_child_state is not None: + new_state.tasks_child_state = self.tasks_child_state.copy() + if self.rescue_child_state is not None: + new_state.rescue_child_state = self.rescue_child_state.copy() + if self.always_child_state is not None: + new_state.always_child_state = self.always_child_state.copy() + return new_state + + +class PlayIterator: + + def __init__(self, inventory, play, play_context, variable_manager, all_vars, start_at_done=False): + self._play = play + self._blocks = [] + self._variable_manager = variable_manager + + setup_block = Block(play=self._play) + # Gathering facts with run_once would copy the facts from one host to + # the others. + setup_block.run_once = False + setup_task = Task(block=setup_block) + setup_task.action = 'gather_facts' + # TODO: hardcoded resolution here, but should use actual resolution code in the end, + # in case of 'legacy' mismatch + setup_task.resolved_action = 'ansible.builtin.gather_facts' + setup_task.name = 'Gathering Facts' + setup_task.args = {} + + # Unless play is specifically tagged, gathering should 'always' run + if not self._play.tags: + setup_task.tags = ['always'] + + # Default options to gather + for option in ('gather_subset', 'gather_timeout', 'fact_path'): + value = getattr(self._play, option, None) + if value is not None: + setup_task.args[option] = value + + setup_task.set_loader(self._play._loader) + # short circuit fact gathering if the entire playbook is conditional + if self._play._included_conditional is not None: + setup_task.when = self._play._included_conditional[:] + setup_block.block = [setup_task] + + setup_block = setup_block.filter_tagged_tasks(all_vars) + self._blocks.append(setup_block) + + # keep flatten (no blocks) list of all tasks from the play + # used for the lockstep mechanism in the linear strategy + self.all_tasks = setup_block.get_tasks() + + for block in self._play.compile(): + new_block = block.filter_tagged_tasks(all_vars) + if new_block.has_tasks(): + self._blocks.append(new_block) + self.all_tasks.extend(new_block.get_tasks()) + + # keep list of all handlers, it is copied into each HostState + # at the beginning of IteratingStates.HANDLERS + # the copy happens at each flush in order to restore the original + # list and remove any included handlers that might not be notified + # at the particular flush + self.handlers = [h for b in self._play.handlers for h in b.block] + + self._host_states = {} + start_at_matched = False + batch = inventory.get_hosts(self._play.hosts, order=self._play.order) + self.batch_size = len(batch) + for host in batch: + self.set_state_for_host(host.name, HostState(blocks=self._blocks)) + # if we're looking to start at a specific task, iterate through + # the tasks for this host until we find the specified task + if play_context.start_at_task is not None and not start_at_done: + while True: + (s, task) = self.get_next_task_for_host(host, peek=True) + if s.run_state == IteratingStates.COMPLETE: + break + if task.name == play_context.start_at_task or (task.name and fnmatch.fnmatch(task.name, play_context.start_at_task)) or \ + task.get_name() == play_context.start_at_task or fnmatch.fnmatch(task.get_name(), play_context.start_at_task): + start_at_matched = True + break + self.set_state_for_host(host.name, s) + + # finally, reset the host's state to IteratingStates.SETUP + if start_at_matched: + self._host_states[host.name].did_start_at_task = True + self._host_states[host.name].run_state = IteratingStates.SETUP + + if start_at_matched: + # we have our match, so clear the start_at_task field on the + # play context to flag that we've started at a task (and future + # plays won't try to advance) + play_context.start_at_task = None + + self.end_play = False + self.cur_task = 0 + + def get_host_state(self, host): + # Since we're using the PlayIterator to carry forward failed hosts, + # in the event that a previous host was not in the current inventory + # we create a stub state for it now + if host.name not in self._host_states: + self.set_state_for_host(host.name, HostState(blocks=[])) + + return self._host_states[host.name].copy() + + def cache_block_tasks(self, block): + display.deprecated( + 'PlayIterator.cache_block_tasks is now noop due to the changes ' + 'in the way tasks are cached and is deprecated.', + version=2.16 + ) + + def get_next_task_for_host(self, host, peek=False): + + display.debug("getting the next task for host %s" % host.name) + s = self.get_host_state(host) + + task = None + if s.run_state == IteratingStates.COMPLETE: + display.debug("host %s is done iterating, returning" % host.name) + return (s, None) + + (s, task) = self._get_next_task_from_state(s, host=host) + + if not peek: + self.set_state_for_host(host.name, s) + + display.debug("done getting next task for host %s" % host.name) + display.debug(" ^ task is: %s" % task) + display.debug(" ^ state is: %s" % s) + return (s, task) + + def _get_next_task_from_state(self, state, host): + + task = None + + # try and find the next task, given the current state. + while True: + # try to get the current block from the list of blocks, and + # if we run past the end of the list we know we're done with + # this block + try: + block = state._blocks[state.cur_block] + except IndexError: + state.run_state = IteratingStates.COMPLETE + return (state, None) + + if state.run_state == IteratingStates.SETUP: + # First, we check to see if we were pending setup. If not, this is + # the first trip through IteratingStates.SETUP, so we set the pending_setup + # flag and try to determine if we do in fact want to gather facts for + # the specified host. + if not state.pending_setup: + state.pending_setup = True + + # Gather facts if the default is 'smart' and we have not yet + # done it for this host; or if 'explicit' and the play sets + # gather_facts to True; or if 'implicit' and the play does + # NOT explicitly set gather_facts to False. + + gathering = C.DEFAULT_GATHERING + implied = self._play.gather_facts is None or boolean(self._play.gather_facts, strict=False) + + if (gathering == 'implicit' and implied) or \ + (gathering == 'explicit' and boolean(self._play.gather_facts, strict=False)) or \ + (gathering == 'smart' and implied and not (self._variable_manager._fact_cache.get(host.name, {}).get('_ansible_facts_gathered', False))): + # The setup block is always self._blocks[0], as we inject it + # during the play compilation in __init__ above. + setup_block = self._blocks[0] + if setup_block.has_tasks() and len(setup_block.block) > 0: + task = setup_block.block[0] + else: + # This is the second trip through IteratingStates.SETUP, so we clear + # the flag and move onto the next block in the list while setting + # the run state to IteratingStates.TASKS + state.pending_setup = False + + state.run_state = IteratingStates.TASKS + if not state.did_start_at_task: + state.cur_block += 1 + state.cur_regular_task = 0 + state.cur_rescue_task = 0 + state.cur_always_task = 0 + state.tasks_child_state = None + state.rescue_child_state = None + state.always_child_state = None + + elif state.run_state == IteratingStates.TASKS: + # clear the pending setup flag, since we're past that and it didn't fail + if state.pending_setup: + state.pending_setup = False + + # First, we check for a child task state that is not failed, and if we + # have one recurse into it for the next task. If we're done with the child + # state, we clear it and drop back to getting the next task from the list. + if state.tasks_child_state: + (state.tasks_child_state, task) = self._get_next_task_from_state(state.tasks_child_state, host=host) + if self._check_failed_state(state.tasks_child_state): + # failed child state, so clear it and move into the rescue portion + state.tasks_child_state = None + self._set_failed_state(state) + else: + # get the next task recursively + if task is None or state.tasks_child_state.run_state == IteratingStates.COMPLETE: + # we're done with the child state, so clear it and continue + # back to the top of the loop to get the next task + state.tasks_child_state = None + continue + else: + # First here, we check to see if we've failed anywhere down the chain + # of states we have, and if so we move onto the rescue portion. Otherwise, + # we check to see if we've moved past the end of the list of tasks. If so, + # we move into the always portion of the block, otherwise we get the next + # task from the list. + if self._check_failed_state(state): + state.run_state = IteratingStates.RESCUE + elif state.cur_regular_task >= len(block.block): + state.run_state = IteratingStates.ALWAYS + else: + task = block.block[state.cur_regular_task] + # if the current task is actually a child block, create a child + # state for us to recurse into on the next pass + if isinstance(task, Block): + state.tasks_child_state = HostState(blocks=[task]) + state.tasks_child_state.run_state = IteratingStates.TASKS + # since we've created the child state, clear the task + # so we can pick up the child state on the next pass + task = None + state.cur_regular_task += 1 + + elif state.run_state == IteratingStates.RESCUE: + # The process here is identical to IteratingStates.TASKS, except instead + # we move into the always portion of the block. + if state.rescue_child_state: + (state.rescue_child_state, task) = self._get_next_task_from_state(state.rescue_child_state, host=host) + if self._check_failed_state(state.rescue_child_state): + state.rescue_child_state = None + self._set_failed_state(state) + else: + if task is None or state.rescue_child_state.run_state == IteratingStates.COMPLETE: + state.rescue_child_state = None + continue + else: + if state.fail_state & FailedStates.RESCUE == FailedStates.RESCUE: + state.run_state = IteratingStates.ALWAYS + elif state.cur_rescue_task >= len(block.rescue): + if len(block.rescue) > 0: + state.fail_state = FailedStates.NONE + state.run_state = IteratingStates.ALWAYS + state.did_rescue = True + else: + task = block.rescue[state.cur_rescue_task] + if isinstance(task, Block): + state.rescue_child_state = HostState(blocks=[task]) + state.rescue_child_state.run_state = IteratingStates.TASKS + task = None + state.cur_rescue_task += 1 + + elif state.run_state == IteratingStates.ALWAYS: + # And again, the process here is identical to IteratingStates.TASKS, except + # instead we either move onto the next block in the list, or we set the + # run state to IteratingStates.COMPLETE in the event of any errors, or when we + # have hit the end of the list of blocks. + if state.always_child_state: + (state.always_child_state, task) = self._get_next_task_from_state(state.always_child_state, host=host) + if self._check_failed_state(state.always_child_state): + state.always_child_state = None + self._set_failed_state(state) + else: + if task is None or state.always_child_state.run_state == IteratingStates.COMPLETE: + state.always_child_state = None + continue + else: + if state.cur_always_task >= len(block.always): + if state.fail_state != FailedStates.NONE: + state.run_state = IteratingStates.COMPLETE + else: + state.cur_block += 1 + state.cur_regular_task = 0 + state.cur_rescue_task = 0 + state.cur_always_task = 0 + state.run_state = IteratingStates.TASKS + state.tasks_child_state = None + state.rescue_child_state = None + state.always_child_state = None + state.did_rescue = False + else: + task = block.always[state.cur_always_task] + if isinstance(task, Block): + state.always_child_state = HostState(blocks=[task]) + state.always_child_state.run_state = IteratingStates.TASKS + task = None + state.cur_always_task += 1 + + elif state.run_state == IteratingStates.HANDLERS: + if state.update_handlers: + # reset handlers for HostState since handlers from include_tasks + # might be there from previous flush + state.handlers = self.handlers[:] + state.update_handlers = False + state.cur_handlers_task = 0 + + if state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS: + state.update_handlers = True + state.run_state = IteratingStates.COMPLETE + else: + while True: + try: + task = state.handlers[state.cur_handlers_task] + except IndexError: + task = None + state.run_state = state.pre_flushing_run_state + state.update_handlers = True + break + else: + state.cur_handlers_task += 1 + if task.is_host_notified(host): + break + + elif state.run_state == IteratingStates.COMPLETE: + return (state, None) + + # if something above set the task, break out of the loop now + if task: + break + + return (state, task) + + def _set_failed_state(self, state): + if state.run_state == IteratingStates.SETUP: + state.fail_state |= FailedStates.SETUP + state.run_state = IteratingStates.COMPLETE + elif state.run_state == IteratingStates.TASKS: + if state.tasks_child_state is not None: + state.tasks_child_state = self._set_failed_state(state.tasks_child_state) + else: + state.fail_state |= FailedStates.TASKS + if state._blocks[state.cur_block].rescue: + state.run_state = IteratingStates.RESCUE + elif state._blocks[state.cur_block].always: + state.run_state = IteratingStates.ALWAYS + else: + state.run_state = IteratingStates.COMPLETE + elif state.run_state == IteratingStates.RESCUE: + if state.rescue_child_state is not None: + state.rescue_child_state = self._set_failed_state(state.rescue_child_state) + else: + state.fail_state |= FailedStates.RESCUE + if state._blocks[state.cur_block].always: + state.run_state = IteratingStates.ALWAYS + else: + state.run_state = IteratingStates.COMPLETE + elif state.run_state == IteratingStates.ALWAYS: + if state.always_child_state is not None: + state.always_child_state = self._set_failed_state(state.always_child_state) + else: + state.fail_state |= FailedStates.ALWAYS + state.run_state = IteratingStates.COMPLETE + elif state.run_state == IteratingStates.HANDLERS: + state.fail_state |= FailedStates.HANDLERS + state.update_handlers = True + if state._blocks[state.cur_block].rescue: + state.run_state = IteratingStates.RESCUE + elif state._blocks[state.cur_block].always: + state.run_state = IteratingStates.ALWAYS + else: + state.run_state = IteratingStates.COMPLETE + return state + + def mark_host_failed(self, host): + s = self.get_host_state(host) + display.debug("marking host %s failed, current state: %s" % (host, s)) + s = self._set_failed_state(s) + display.debug("^ failed state is now: %s" % s) + self.set_state_for_host(host.name, s) + self._play._removed_hosts.append(host.name) + + def get_failed_hosts(self): + return dict((host, True) for (host, state) in self._host_states.items() if self._check_failed_state(state)) + + def _check_failed_state(self, state): + if state is None: + return False + elif state.run_state == IteratingStates.RESCUE and self._check_failed_state(state.rescue_child_state): + return True + elif state.run_state == IteratingStates.ALWAYS and self._check_failed_state(state.always_child_state): + return True + elif state.run_state == IteratingStates.HANDLERS and state.fail_state & FailedStates.HANDLERS == FailedStates.HANDLERS: + return True + elif state.fail_state != FailedStates.NONE: + if state.run_state == IteratingStates.RESCUE and state.fail_state & FailedStates.RESCUE == 0: + return False + elif state.run_state == IteratingStates.ALWAYS and state.fail_state & FailedStates.ALWAYS == 0: + return False + else: + return not (state.did_rescue and state.fail_state & FailedStates.ALWAYS == 0) + elif state.run_state == IteratingStates.TASKS and self._check_failed_state(state.tasks_child_state): + cur_block = state._blocks[state.cur_block] + if len(cur_block.rescue) > 0 and state.fail_state & FailedStates.RESCUE == 0: + return False + else: + return True + return False + + def is_failed(self, host): + s = self.get_host_state(host) + return self._check_failed_state(s) + + def clear_host_errors(self, host): + self._clear_state_errors(self.get_state_for_host(host.name)) + + def _clear_state_errors(self, state: HostState) -> None: + state.fail_state = FailedStates.NONE + + if state.tasks_child_state is not None: + self._clear_state_errors(state.tasks_child_state) + elif state.rescue_child_state is not None: + self._clear_state_errors(state.rescue_child_state) + elif state.always_child_state is not None: + self._clear_state_errors(state.always_child_state) + + def get_active_state(self, state): + ''' + Finds the active state, recursively if necessary when there are child states. + ''' + if state.run_state == IteratingStates.TASKS and state.tasks_child_state is not None: + return self.get_active_state(state.tasks_child_state) + elif state.run_state == IteratingStates.RESCUE and state.rescue_child_state is not None: + return self.get_active_state(state.rescue_child_state) + elif state.run_state == IteratingStates.ALWAYS and state.always_child_state is not None: + return self.get_active_state(state.always_child_state) + return state + + def is_any_block_rescuing(self, state): + ''' + Given the current HostState state, determines if the current block, or any child blocks, + are in rescue mode. + ''' + if state.run_state == IteratingStates.TASKS and state.get_current_block().rescue: + return True + if state.tasks_child_state is not None: + return self.is_any_block_rescuing(state.tasks_child_state) + if state.rescue_child_state is not None: + return self.is_any_block_rescuing(state.rescue_child_state) + if state.always_child_state is not None: + return self.is_any_block_rescuing(state.always_child_state) + return False + + def get_original_task(self, host, task): + display.deprecated( + 'PlayIterator.get_original_task is now noop due to the changes ' + 'in the way tasks are cached and is deprecated.', + version=2.16 + ) + return (None, None) + + def _insert_tasks_into_state(self, state, task_list): + # if we've failed at all, or if the task list is empty, just return the current state + if (state.fail_state != FailedStates.NONE and state.run_state == IteratingStates.TASKS) or not task_list: + return state + + if state.run_state == IteratingStates.TASKS: + if state.tasks_child_state: + state.tasks_child_state = self._insert_tasks_into_state(state.tasks_child_state, task_list) + else: + target_block = state._blocks[state.cur_block].copy() + before = target_block.block[:state.cur_regular_task] + after = target_block.block[state.cur_regular_task:] + target_block.block = before + task_list + after + state._blocks[state.cur_block] = target_block + elif state.run_state == IteratingStates.RESCUE: + if state.rescue_child_state: + state.rescue_child_state = self._insert_tasks_into_state(state.rescue_child_state, task_list) + else: + target_block = state._blocks[state.cur_block].copy() + before = target_block.rescue[:state.cur_rescue_task] + after = target_block.rescue[state.cur_rescue_task:] + target_block.rescue = before + task_list + after + state._blocks[state.cur_block] = target_block + elif state.run_state == IteratingStates.ALWAYS: + if state.always_child_state: + state.always_child_state = self._insert_tasks_into_state(state.always_child_state, task_list) + else: + target_block = state._blocks[state.cur_block].copy() + before = target_block.always[:state.cur_always_task] + after = target_block.always[state.cur_always_task:] + target_block.always = before + task_list + after + state._blocks[state.cur_block] = target_block + elif state.run_state == IteratingStates.HANDLERS: + state.handlers[state.cur_handlers_task:state.cur_handlers_task] = [h for b in task_list for h in b.block] + + return state + + def add_tasks(self, host, task_list): + self.set_state_for_host(host.name, self._insert_tasks_into_state(self.get_host_state(host), task_list)) + + @property + def host_states(self): + return self._host_states + + def get_state_for_host(self, hostname: str) -> HostState: + return self._host_states[hostname] + + def set_state_for_host(self, hostname: str, state: HostState) -> None: + if not isinstance(state, HostState): + raise AnsibleAssertionError('Expected state to be a HostState but was a %s' % type(state)) + self._host_states[hostname] = state + + def set_run_state_for_host(self, hostname: str, run_state: IteratingStates) -> None: + if not isinstance(run_state, IteratingStates): + raise AnsibleAssertionError('Expected run_state to be a IteratingStates but was %s' % (type(run_state))) + self._host_states[hostname].run_state = run_state + + def set_fail_state_for_host(self, hostname: str, fail_state: FailedStates) -> None: + if not isinstance(fail_state, FailedStates): + raise AnsibleAssertionError('Expected fail_state to be a FailedStates but was %s' % (type(fail_state))) + self._host_states[hostname].fail_state = fail_state diff --git a/lib/ansible/executor/playbook_executor.py b/lib/ansible/executor/playbook_executor.py new file mode 100644 index 0000000..e8b2a3d --- /dev/null +++ b/lib/ansible/executor/playbook_executor.py @@ -0,0 +1,335 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from ansible import constants as C +from ansible import context +from ansible.executor.task_queue_manager import TaskQueueManager, AnsibleEndPlay +from ansible.module_utils._text import to_text +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.plugins.loader import become_loader, connection_loader, shell_loader +from ansible.playbook import Playbook +from ansible.template import Templar +from ansible.utils.helpers import pct_to_int +from ansible.utils.collection_loader import AnsibleCollectionConfig +from ansible.utils.collection_loader._collection_finder import _get_collection_name_from_path, _get_collection_playbook_path +from ansible.utils.path import makedirs_safe +from ansible.utils.ssh_functions import set_default_transport +from ansible.utils.display import Display + + +display = Display() + + +class PlaybookExecutor: + + ''' + This is the primary class for executing playbooks, and thus the + basis for bin/ansible-playbook operation. + ''' + + def __init__(self, playbooks, inventory, variable_manager, loader, passwords): + self._playbooks = playbooks + self._inventory = inventory + self._variable_manager = variable_manager + self._loader = loader + self.passwords = passwords + self._unreachable_hosts = dict() + + if context.CLIARGS.get('listhosts') or context.CLIARGS.get('listtasks') or \ + context.CLIARGS.get('listtags') or context.CLIARGS.get('syntax'): + self._tqm = None + else: + self._tqm = TaskQueueManager( + inventory=inventory, + variable_manager=variable_manager, + loader=loader, + passwords=self.passwords, + forks=context.CLIARGS.get('forks'), + ) + + # Note: We run this here to cache whether the default ansible ssh + # executable supports control persist. Sometime in the future we may + # need to enhance this to check that ansible_ssh_executable specified + # in inventory is also cached. We can't do this caching at the point + # where it is used (in task_executor) because that is post-fork and + # therefore would be discarded after every task. + set_default_transport() + + def run(self): + ''' + Run the given playbook, based on the settings in the play which + may limit the runs to serialized groups, etc. + ''' + + result = 0 + entrylist = [] + entry = {} + try: + # preload become/connection/shell to set config defs cached + list(connection_loader.all(class_only=True)) + list(shell_loader.all(class_only=True)) + list(become_loader.all(class_only=True)) + + for playbook in self._playbooks: + + # deal with FQCN + resource = _get_collection_playbook_path(playbook) + if resource is not None: + playbook_path = resource[1] + playbook_collection = resource[2] + else: + playbook_path = playbook + # not fqcn, but might still be colleciotn playbook + playbook_collection = _get_collection_name_from_path(playbook) + + if playbook_collection: + display.warning("running playbook inside collection {0}".format(playbook_collection)) + AnsibleCollectionConfig.default_collection = playbook_collection + else: + AnsibleCollectionConfig.default_collection = None + + pb = Playbook.load(playbook_path, variable_manager=self._variable_manager, loader=self._loader) + # FIXME: move out of inventory self._inventory.set_playbook_basedir(os.path.realpath(os.path.dirname(playbook_path))) + + if self._tqm is None: # we are doing a listing + entry = {'playbook': playbook_path} + entry['plays'] = [] + else: + # make sure the tqm has callbacks loaded + self._tqm.load_callbacks() + self._tqm.send_callback('v2_playbook_on_start', pb) + + i = 1 + plays = pb.get_plays() + display.vv(u'%d plays in %s' % (len(plays), to_text(playbook_path))) + + for play in plays: + if play._included_path is not None: + self._loader.set_basedir(play._included_path) + else: + self._loader.set_basedir(pb._basedir) + + # clear any filters which may have been applied to the inventory + self._inventory.remove_restriction() + + # Allow variables to be used in vars_prompt fields. + all_vars = self._variable_manager.get_vars(play=play) + templar = Templar(loader=self._loader, variables=all_vars) + setattr(play, 'vars_prompt', templar.template(play.vars_prompt)) + + # FIXME: this should be a play 'sub object' like loop_control + if play.vars_prompt: + for var in play.vars_prompt: + vname = var['name'] + prompt = var.get("prompt", vname) + default = var.get("default", None) + private = boolean(var.get("private", True)) + confirm = boolean(var.get("confirm", False)) + encrypt = var.get("encrypt", None) + salt_size = var.get("salt_size", None) + salt = var.get("salt", None) + unsafe = var.get("unsafe", None) + + if vname not in self._variable_manager.extra_vars: + if self._tqm: + self._tqm.send_callback('v2_playbook_on_vars_prompt', vname, private, prompt, encrypt, confirm, salt_size, salt, + default, unsafe) + play.vars[vname] = display.do_var_prompt(vname, private, prompt, encrypt, confirm, salt_size, salt, default, unsafe) + else: # we are either in --list-<option> or syntax check + play.vars[vname] = default + + # Post validate so any play level variables are templated + all_vars = self._variable_manager.get_vars(play=play) + templar = Templar(loader=self._loader, variables=all_vars) + play.post_validate(templar) + + if context.CLIARGS['syntax']: + continue + + if self._tqm is None: + # we are just doing a listing + entry['plays'].append(play) + + else: + self._tqm._unreachable_hosts.update(self._unreachable_hosts) + + previously_failed = len(self._tqm._failed_hosts) + previously_unreachable = len(self._tqm._unreachable_hosts) + + break_play = False + # we are actually running plays + batches = self._get_serialized_batches(play) + if len(batches) == 0: + self._tqm.send_callback('v2_playbook_on_play_start', play) + self._tqm.send_callback('v2_playbook_on_no_hosts_matched') + for batch in batches: + # restrict the inventory to the hosts in the serialized batch + self._inventory.restrict_to_hosts(batch) + # and run it... + try: + result = self._tqm.run(play=play) + except AnsibleEndPlay as e: + result = e.result + break + + # break the play if the result equals the special return code + if result & self._tqm.RUN_FAILED_BREAK_PLAY != 0: + result = self._tqm.RUN_FAILED_HOSTS + break_play = True + + # check the number of failures here, to see if they're above the maximum + # failure percentage allowed, or if any errors are fatal. If either of those + # conditions are met, we break out, otherwise we only break out if the entire + # batch failed + failed_hosts_count = len(self._tqm._failed_hosts) + len(self._tqm._unreachable_hosts) - \ + (previously_failed + previously_unreachable) + + if len(batch) == failed_hosts_count: + break_play = True + break + + # update the previous counts so they don't accumulate incorrectly + # over multiple serial batches + previously_failed += len(self._tqm._failed_hosts) - previously_failed + previously_unreachable += len(self._tqm._unreachable_hosts) - previously_unreachable + + # save the unreachable hosts from this batch + self._unreachable_hosts.update(self._tqm._unreachable_hosts) + + if break_play: + break + + i = i + 1 # per play + + if entry: + entrylist.append(entry) # per playbook + + # send the stats callback for this playbook + if self._tqm is not None: + if C.RETRY_FILES_ENABLED: + retries = set(self._tqm._failed_hosts.keys()) + retries.update(self._tqm._unreachable_hosts.keys()) + retries = sorted(retries) + if len(retries) > 0: + if C.RETRY_FILES_SAVE_PATH: + basedir = C.RETRY_FILES_SAVE_PATH + elif playbook_path: + basedir = os.path.dirname(os.path.abspath(playbook_path)) + else: + basedir = '~/' + + (retry_name, _) = os.path.splitext(os.path.basename(playbook_path)) + filename = os.path.join(basedir, "%s.retry" % retry_name) + if self._generate_retry_inventory(filename, retries): + display.display("\tto retry, use: --limit @%s\n" % filename) + + self._tqm.send_callback('v2_playbook_on_stats', self._tqm._stats) + + # if the last result wasn't zero, break out of the playbook file name loop + if result != 0: + break + + if entrylist: + return entrylist + + finally: + if self._tqm is not None: + self._tqm.cleanup() + if self._loader: + self._loader.cleanup_all_tmp_files() + + if context.CLIARGS['syntax']: + display.display("No issues encountered") + return result + + if context.CLIARGS['start_at_task'] and not self._tqm._start_at_done: + display.error( + "No matching task \"%s\" found." + " Note: --start-at-task can only follow static includes." + % context.CLIARGS['start_at_task'] + ) + + return result + + def _get_serialized_batches(self, play): + ''' + Returns a list of hosts, subdivided into batches based on + the serial size specified in the play. + ''' + + # make sure we have a unique list of hosts + all_hosts = self._inventory.get_hosts(play.hosts, order=play.order) + all_hosts_len = len(all_hosts) + + # the serial value can be listed as a scalar or a list of + # scalars, so we make sure it's a list here + serial_batch_list = play.serial + if len(serial_batch_list) == 0: + serial_batch_list = [-1] + + cur_item = 0 + serialized_batches = [] + + while len(all_hosts) > 0: + # get the serial value from current item in the list + serial = pct_to_int(serial_batch_list[cur_item], all_hosts_len) + + # if the serial count was not specified or is invalid, default to + # a list of all hosts, otherwise grab a chunk of the hosts equal + # to the current serial item size + if serial <= 0: + serialized_batches.append(all_hosts) + break + else: + play_hosts = [] + for x in range(serial): + if len(all_hosts) > 0: + play_hosts.append(all_hosts.pop(0)) + + serialized_batches.append(play_hosts) + + # increment the current batch list item number, and if we've hit + # the end keep using the last element until we've consumed all of + # the hosts in the inventory + cur_item += 1 + if cur_item > len(serial_batch_list) - 1: + cur_item = len(serial_batch_list) - 1 + + return serialized_batches + + def _generate_retry_inventory(self, retry_path, replay_hosts): + ''' + Called when a playbook run fails. It generates an inventory which allows + re-running on ONLY the failed hosts. This may duplicate some variable + information in group_vars/host_vars but that is ok, and expected. + ''' + try: + makedirs_safe(os.path.dirname(retry_path)) + with open(retry_path, 'w') as fd: + for x in replay_hosts: + fd.write("%s\n" % x) + except Exception as e: + display.warning("Could not create retry file '%s'.\n\t%s" % (retry_path, to_text(e))) + return False + + return True diff --git a/lib/ansible/executor/powershell/__init__.py b/lib/ansible/executor/powershell/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/ansible/executor/powershell/__init__.py diff --git a/lib/ansible/executor/powershell/async_watchdog.ps1 b/lib/ansible/executor/powershell/async_watchdog.ps1 new file mode 100644 index 0000000..c2138e3 --- /dev/null +++ b/lib/ansible/executor/powershell/async_watchdog.ps1 @@ -0,0 +1,117 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +# help with debugging errors as we don't have visibility of this running process +trap { + $watchdog_path = "$($env:TEMP)\ansible-async-watchdog-error-$(Get-Date -Format "yyyy-MM-ddTHH-mm-ss.ffffZ").txt" + $error_msg = "Error while running the async exec wrapper`r`n$(Format-AnsibleException -ErrorRecord $_)" + Set-Content -Path $watchdog_path -Value $error_msg + break +} + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting async_watchdog" "async_watchdog" + +# pop 0th action as entrypoint +$payload.actions = $payload.actions[1..99] + +$actions = $Payload.actions +$entrypoint = $payload.($actions[0]) +$entrypoint = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($entrypoint)) + +$resultfile_path = $payload.async_results_path +$max_exec_time_sec = $payload.async_timeout_sec + +Write-AnsibleLog "INFO - deserializing existing result file args at: '$resultfile_path'" "async_watchdog" +if (-not (Test-Path -Path $resultfile_path)) { + $msg = "result file at '$resultfile_path' does not exist" + Write-AnsibleLog "ERROR - $msg" "async_watchdog" + throw $msg +} +$result_json = Get-Content -Path $resultfile_path -Raw +Write-AnsibleLog "INFO - result file json is: $result_json" "async_watchdog" +$result = ConvertFrom-AnsibleJson -InputObject $result_json + +Write-AnsibleLog "INFO - creating async runspace" "async_watchdog" +$rs = [RunspaceFactory]::CreateRunspace() +$rs.Open() + +Write-AnsibleLog "INFO - creating async PowerShell pipeline" "async_watchdog" +$ps = [PowerShell]::Create() +$ps.Runspace = $rs + +# these functions are set in exec_wrapper +Write-AnsibleLog "INFO - adding global functions to PowerShell pipeline script" "async_watchdog" +$ps.AddScript($script:common_functions).AddStatement() > $null +$ps.AddScript($script:wrapper_functions).AddStatement() > $null +$function_params = @{ + Name = "common_functions" + Value = $script:common_functions + Scope = "script" +} +$ps.AddCommand("Set-Variable").AddParameters($function_params).AddStatement() > $null + +Write-AnsibleLog "INFO - adding $($actions[0]) to PowerShell pipeline script" "async_watchdog" +$ps.AddScript($entrypoint).AddArgument($payload) > $null + +Write-AnsibleLog "INFO - async job start, calling BeginInvoke()" "async_watchdog" +$job_async_result = $ps.BeginInvoke() + +Write-AnsibleLog "INFO - waiting '$max_exec_time_sec' seconds for async job to complete" "async_watchdog" +$job_async_result.AsyncWaitHandle.WaitOne($max_exec_time_sec * 1000) > $null +$result.finished = 1 + +if ($job_async_result.IsCompleted) { + Write-AnsibleLog "INFO - async job completed, calling EndInvoke()" "async_watchdog" + + $job_output = $ps.EndInvoke($job_async_result) + $job_error = $ps.Streams.Error + + Write-AnsibleLog "INFO - raw module stdout:`r`n$($job_output | Out-String)" "async_watchdog" + if ($job_error) { + Write-AnsibleLog "WARN - raw module stderr:`r`n$($job_error | Out-String)" "async_watchdog" + } + + # write success/output/error to result object + # TODO: cleanse leading/trailing junk + try { + Write-AnsibleLog "INFO - deserializing Ansible stdout" "async_watchdog" + $module_result = ConvertFrom-AnsibleJson -InputObject $job_output + # TODO: check for conflicting keys + $result = $result + $module_result + } + catch { + $result.failed = $true + $result.msg = "failed to parse module output: $($_.Exception.Message)" + # return output back to Ansible to help with debugging errors + $result.stdout = $job_output | Out-String + $result.stderr = $job_error | Out-String + } + + $result_json = ConvertTo-Json -InputObject $result -Depth 99 -Compress + Set-Content -Path $resultfile_path -Value $result_json + + Write-AnsibleLog "INFO - wrote output to $resultfile_path" "async_watchdog" +} +else { + Write-AnsibleLog "ERROR - reached timeout on async job, stopping job" "async_watchdog" + $ps.BeginStop($null, $null) > $null # best effort stop + + # write timeout to result object + $result.failed = $true + $result.msg = "timed out waiting for module completion" + $result_json = ConvertTo-Json -InputObject $result -Depth 99 -Compress + Set-Content -Path $resultfile_path -Value $result_json + + Write-AnsibleLog "INFO - wrote timeout to '$resultfile_path'" "async_watchdog" +} + +# in the case of a hung pipeline, this will cause the process to stay alive until it's un-hung... +#$rs.Close() | Out-Null + +Write-AnsibleLog "INFO - ending async_watchdog" "async_watchdog" diff --git a/lib/ansible/executor/powershell/async_wrapper.ps1 b/lib/ansible/executor/powershell/async_wrapper.ps1 new file mode 100644 index 0000000..0cd640f --- /dev/null +++ b/lib/ansible/executor/powershell/async_wrapper.ps1 @@ -0,0 +1,174 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting async_wrapper" "async_wrapper" + +if (-not $Payload.environment.ContainsKey("ANSIBLE_ASYNC_DIR")) { + Write-AnsibleError -Message "internal error: the environment variable ANSIBLE_ASYNC_DIR is not set and is required for an async task" + $host.SetShouldExit(1) + return +} +$async_dir = [System.Environment]::ExpandEnvironmentVariables($Payload.environment.ANSIBLE_ASYNC_DIR) + +# calculate the result path so we can include it in the worker payload +$jid = $Payload.async_jid +$local_jid = $jid + "." + $pid + +$results_path = [System.IO.Path]::Combine($async_dir, $local_jid) + +Write-AnsibleLog "INFO - creating async results path at '$results_path'" "async_wrapper" + +$Payload.async_results_path = $results_path +[System.IO.Directory]::CreateDirectory([System.IO.Path]::GetDirectoryName($results_path)) > $null + +# we use Win32_Process to escape the current process job, CreateProcess with a +# breakaway flag won't work for psrp as the psrp process does not have breakaway +# rights. Unfortunately we can't read/write to the spawned process as we can't +# inherit the handles. We use a locked down named pipe to send the exec_wrapper +# payload. Anonymous pipes won't work as the spawned process will not be a child +# of the current one and will not be able to inherit the handles + +# pop the async_wrapper action so we don't get stuck in a loop and create new +# exec_wrapper for our async process +$Payload.actions = $Payload.actions[1..99] +$payload_json = ConvertTo-Json -InputObject $Payload -Depth 99 -Compress + +# +$exec_wrapper = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.exec_wrapper)) +$exec_wrapper += "`0`0`0`0" + $payload_json +$payload_bytes = [System.Text.Encoding]::UTF8.GetBytes($exec_wrapper) +$pipe_name = "ansible-async-$jid-$([guid]::NewGuid())" + +# template the async process command line with the payload details +$bootstrap_wrapper = { + # help with debugging errors as we loose visibility of the process output + # from here on + trap { + $wrapper_path = "$($env:TEMP)\ansible-async-wrapper-error-$(Get-Date -Format "yyyy-MM-ddTHH-mm-ss.ffffZ").txt" + $error_msg = "Error while running the async exec wrapper`r`n$($_ | Out-String)`r`n$($_.ScriptStackTrace)" + Set-Content -Path $wrapper_path -Value $error_msg + break + } + + &chcp.com 65001 > $null + + # store the pipe name and no. of bytes to read, these are populated before + # before the process is created - do not remove or changed + $pipe_name = "" + $bytes_length = 0 + + $input_bytes = New-Object -TypeName byte[] -ArgumentList $bytes_length + $pipe = New-Object -TypeName System.IO.Pipes.NamedPipeClientStream -ArgumentList @( + ".", # localhost + $pipe_name, + [System.IO.Pipes.PipeDirection]::In, + [System.IO.Pipes.PipeOptions]::None, + [System.Security.Principal.TokenImpersonationLevel]::Anonymous + ) + try { + $pipe.Connect() + $pipe.Read($input_bytes, 0, $bytes_length) > $null + } + finally { + $pipe.Close() + } + $exec = [System.Text.Encoding]::UTF8.GetString($input_bytes) + $exec_parts = $exec.Split(@("`0`0`0`0"), 2, [StringSplitOptions]::RemoveEmptyEntries) + Set-Variable -Name json_raw -Value $exec_parts[1] + $exec = [ScriptBlock]::Create($exec_parts[0]) + &$exec +} + +$bootstrap_wrapper = $bootstrap_wrapper.ToString().Replace('$pipe_name = ""', "`$pipe_name = `"$pipe_name`"") +$bootstrap_wrapper = $bootstrap_wrapper.Replace('$bytes_length = 0', "`$bytes_length = $($payload_bytes.Count)") +$encoded_command = [System.Convert]::ToBase64String([System.Text.Encoding]::Unicode.GetBytes($bootstrap_wrapper)) +$pwsh_path = "$env:SystemRoot\System32\WindowsPowerShell\v1.0\powershell.exe" +$exec_args = "`"$pwsh_path`" -NonInteractive -NoProfile -ExecutionPolicy Bypass -EncodedCommand $encoded_command" + +# create a named pipe that is set to allow only the current user read access +$current_user = ([Security.Principal.WindowsIdentity]::GetCurrent()).User +$pipe_sec = New-Object -TypeName System.IO.Pipes.PipeSecurity +$pipe_ar = New-Object -TypeName System.IO.Pipes.PipeAccessRule -ArgumentList @( + $current_user, + [System.IO.Pipes.PipeAccessRights]::Read, + [System.Security.AccessControl.AccessControlType]::Allow +) +$pipe_sec.AddAccessRule($pipe_ar) + +Write-AnsibleLog "INFO - creating named pipe '$pipe_name'" "async_wrapper" +$pipe = New-Object -TypeName System.IO.Pipes.NamedPipeServerStream -ArgumentList @( + $pipe_name, + [System.IO.Pipes.PipeDirection]::Out, + 1, + [System.IO.Pipes.PipeTransmissionMode]::Byte, + [System.IO.Pipes.PipeOptions]::Asynchronous, + 0, + 0, + $pipe_sec +) + +try { + Write-AnsibleLog "INFO - creating async process '$exec_args'" "async_wrapper" + $process = Invoke-CimMethod -ClassName Win32_Process -Name Create -Arguments @{CommandLine = $exec_args } + $rc = $process.ReturnValue + + Write-AnsibleLog "INFO - return value from async process exec: $rc" "async_wrapper" + if ($rc -ne 0) { + $error_msg = switch ($rc) { + 2 { "Access denied" } + 3 { "Insufficient privilege" } + 8 { "Unknown failure" } + 9 { "Path not found" } + 21 { "Invalid parameter" } + default { "Other" } + } + throw "Failed to start async process: $rc ($error_msg)" + } + $watchdog_pid = $process.ProcessId + Write-AnsibleLog "INFO - created async process PID: $watchdog_pid" "async_wrapper" + + # populate initial results before we send the async data to avoid result race + $result = @{ + started = 1; + finished = 0; + results_file = $results_path; + ansible_job_id = $local_jid; + _ansible_suppress_tmpdir_delete = $true; + ansible_async_watchdog_pid = $watchdog_pid + } + + Write-AnsibleLog "INFO - writing initial async results to '$results_path'" "async_wrapper" + $result_json = ConvertTo-Json -InputObject $result -Depth 99 -Compress + Set-Content $results_path -Value $result_json + + $np_timeout = $Payload.async_startup_timeout * 1000 + Write-AnsibleLog "INFO - waiting for async process to connect to named pipe for $np_timeout milliseconds" "async_wrapper" + $wait_async = $pipe.BeginWaitForConnection($null, $null) + $wait_async.AsyncWaitHandle.WaitOne($np_timeout) > $null + if (-not $wait_async.IsCompleted) { + $msg = "Ansible encountered a timeout while waiting for the async task to start and connect to the named" + $msg += "pipe. This can be affected by the performance of the target - you can increase this timeout using" + $msg += "WIN_ASYNC_STARTUP_TIMEOUT or just for this host using the win_async_startup_timeout hostvar if " + $msg += "this keeps happening." + throw $msg + } + $pipe.EndWaitForConnection($wait_async) + + Write-AnsibleLog "INFO - writing exec_wrapper and payload to async process" "async_wrapper" + $pipe.Write($payload_bytes, 0, $payload_bytes.Count) + $pipe.Flush() + $pipe.WaitForPipeDrain() +} +finally { + $pipe.Close() +} + +Write-AnsibleLog "INFO - outputting initial async result: $result_json" "async_wrapper" +Write-Output -InputObject $result_json +Write-AnsibleLog "INFO - ending async_wrapper" "async_wrapper" diff --git a/lib/ansible/executor/powershell/become_wrapper.ps1 b/lib/ansible/executor/powershell/become_wrapper.ps1 new file mode 100644 index 0000000..f40e265 --- /dev/null +++ b/lib/ansible/executor/powershell/become_wrapper.ps1 @@ -0,0 +1,163 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +#Requires -Module Ansible.ModuleUtils.AddType +#AnsibleRequires -CSharpUtil Ansible.AccessToken +#AnsibleRequires -CSharpUtil Ansible.Become + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting become_wrapper" "become_wrapper" + +Function Get-EnumValue($enum, $flag_type, $value) { + $raw_enum_value = $value.Replace('_', '') + try { + $enum_value = [Enum]::Parse($enum, $raw_enum_value, $true) + } + catch [System.ArgumentException] { + $valid_options = [Enum]::GetNames($enum) | ForEach-Object -Process { + (($_ -creplace "(.)([A-Z][a-z]+)", '$1_$2') -creplace "([a-z0-9])([A-Z])", '$1_$2').ToString().ToLower() + } + throw "become_flags $flag_type value '$value' is not valid, valid values are: $($valid_options -join ", ")" + } + return $enum_value +} + +Function Get-BecomeFlag($flags) { + $logon_type = [Ansible.AccessToken.LogonType]::Interactive + $logon_flags = [Ansible.Become.LogonFlags]::WithProfile + + if ($null -eq $flags -or $flags -eq "") { + $flag_split = @() + } + elseif ($flags -is [string]) { + $flag_split = $flags.Split(" ") + } + else { + throw "become_flags must be a string, was $($flags.GetType())" + } + + foreach ($flag in $flag_split) { + $split = $flag.Split("=") + if ($split.Count -ne 2) { + throw "become_flags entry '$flag' is in an invalid format, must be a key=value pair" + } + $flag_key = $split[0] + $flag_value = $split[1] + if ($flag_key -eq "logon_type") { + $enum_details = @{ + enum = [Ansible.AccessToken.LogonType] + flag_type = $flag_key + value = $flag_value + } + $logon_type = Get-EnumValue @enum_details + } + elseif ($flag_key -eq "logon_flags") { + $logon_flag_values = $flag_value.Split(",") + $logon_flags = 0 -as [Ansible.Become.LogonFlags] + foreach ($logon_flag_value in $logon_flag_values) { + if ($logon_flag_value -eq "") { + continue + } + $enum_details = @{ + enum = [Ansible.Become.LogonFlags] + flag_type = $flag_key + value = $logon_flag_value + } + $logon_flag = Get-EnumValue @enum_details + $logon_flags = $logon_flags -bor $logon_flag + } + } + else { + throw "become_flags key '$flag_key' is not a valid runas flag, must be 'logon_type' or 'logon_flags'" + } + } + + return $logon_type, [Ansible.Become.LogonFlags]$logon_flags +} + +Write-AnsibleLog "INFO - loading C# become code" "become_wrapper" +$add_type_b64 = $Payload.powershell_modules["Ansible.ModuleUtils.AddType"] +$add_type = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($add_type_b64)) +New-Module -Name Ansible.ModuleUtils.AddType -ScriptBlock ([ScriptBlock]::Create($add_type)) | Import-Module > $null + +$new_tmp = [System.Environment]::ExpandEnvironmentVariables($Payload.module_args["_ansible_remote_tmp"]) +$access_def = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.csharp_utils["Ansible.AccessToken"])) +$become_def = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.csharp_utils["Ansible.Become"])) +$process_def = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.csharp_utils["Ansible.Process"])) +Add-CSharpType -References $access_def, $become_def, $process_def -TempPath $new_tmp -IncludeDebugInfo + +$username = $Payload.become_user +$password = $Payload.become_password +# We need to set password to the value of NullString so a null password is preserved when crossing the .NET +# boundary. If we pass $null it will automatically be converted to "" and we need to keep the distinction for +# accounts that don't have a password and when someone wants to become without knowing the password. +if ($null -eq $password) { + $password = [NullString]::Value +} + +try { + $logon_type, $logon_flags = Get-BecomeFlag -flags $Payload.become_flags +} +catch { + Write-AnsibleError -Message "internal error: failed to parse become_flags '$($Payload.become_flags)'" -ErrorRecord $_ + $host.SetShouldExit(1) + return +} +Write-AnsibleLog "INFO - parsed become input, user: '$username', type: '$logon_type', flags: '$logon_flags'" "become_wrapper" + +# NB: CreateProcessWithTokenW commandline maxes out at 1024 chars, must +# bootstrap via small wrapper which contains the exec_wrapper passed through the +# stdin pipe. Cannot use 'powershell -' as the $ErrorActionPreference is always +# set to Stop and cannot be changed. Also need to split the payload from the wrapper to prevent potentially +# sensitive content from being logged by the scriptblock logger. +$bootstrap_wrapper = { + &chcp.com 65001 > $null + $exec_wrapper_str = [System.Console]::In.ReadToEnd() + $split_parts = $exec_wrapper_str.Split(@("`0`0`0`0"), 2, [StringSplitOptions]::RemoveEmptyEntries) + Set-Variable -Name json_raw -Value $split_parts[1] + $exec_wrapper = [ScriptBlock]::Create($split_parts[0]) + &$exec_wrapper +} +$exec_command = [System.Convert]::ToBase64String([System.Text.Encoding]::Unicode.GetBytes($bootstrap_wrapper.ToString())) +$lp_command_line = "powershell.exe -NonInteractive -NoProfile -ExecutionPolicy Bypass -EncodedCommand $exec_command" +$lp_current_directory = $env:SystemRoot # TODO: should this be set to the become user's profile dir? + +# pop the become_wrapper action so we don't get stuck in a loop +$Payload.actions = $Payload.actions[1..99] +# we want the output from the exec_wrapper to be base64 encoded to preserve unicode chars +$Payload.encoded_output = $true + +$payload_json = ConvertTo-Json -InputObject $Payload -Depth 99 -Compress +# delimit the payload JSON from the wrapper to keep sensitive contents out of scriptblocks (which can be logged) +$exec_wrapper = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.exec_wrapper)) +$exec_wrapper += "`0`0`0`0" + $payload_json + +try { + Write-AnsibleLog "INFO - starting become process '$lp_command_line'" "become_wrapper" + $result = [Ansible.Become.BecomeUtil]::CreateProcessAsUser($username, $password, $logon_flags, $logon_type, + $null, $lp_command_line, $lp_current_directory, $null, $exec_wrapper) + Write-AnsibleLog "INFO - become process complete with rc: $($result.ExitCode)" "become_wrapper" + $stdout = $result.StandardOut + try { + $stdout = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($stdout)) + } + catch [FormatException] { + # output wasn't Base64, ignore as it may contain an error message we want to pass to Ansible + Write-AnsibleLog "WARN - become process stdout was not base64 encoded as expected: $stdout" + } + + $host.UI.WriteLine($stdout) + $host.UI.WriteErrorLine($result.StandardError.Trim()) + $host.SetShouldExit($result.ExitCode) +} +catch { + Write-AnsibleError -Message "internal error: failed to become user '$username'" -ErrorRecord $_ + $host.SetShouldExit(1) +} + +Write-AnsibleLog "INFO - ending become_wrapper" "become_wrapper" diff --git a/lib/ansible/executor/powershell/bootstrap_wrapper.ps1 b/lib/ansible/executor/powershell/bootstrap_wrapper.ps1 new file mode 100644 index 0000000..cdba80c --- /dev/null +++ b/lib/ansible/executor/powershell/bootstrap_wrapper.ps1 @@ -0,0 +1,13 @@ +&chcp.com 65001 > $null + +if ($PSVersionTable.PSVersion -lt [Version]"3.0") { + '{"failed":true,"msg":"Ansible requires PowerShell v3.0 or newer"}' + exit 1 +} + +$exec_wrapper_str = $input | Out-String +$split_parts = $exec_wrapper_str.Split(@("`0`0`0`0"), 2, [StringSplitOptions]::RemoveEmptyEntries) +If (-not $split_parts.Length -eq 2) { throw "invalid payload" } +Set-Variable -Name json_raw -Value $split_parts[1] +$exec_wrapper = [ScriptBlock]::Create($split_parts[0]) +&$exec_wrapper diff --git a/lib/ansible/executor/powershell/coverage_wrapper.ps1 b/lib/ansible/executor/powershell/coverage_wrapper.ps1 new file mode 100644 index 0000000..26cbe66 --- /dev/null +++ b/lib/ansible/executor/powershell/coverage_wrapper.ps1 @@ -0,0 +1,199 @@ +# (c) 2019 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +#AnsibleRequires -Wrapper module_wrapper + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting coverage_wrapper" "coverage_wrapper" + +# Required to be set for psrp to we can set a breakpoint in the remote runspace +if ($PSVersionTable.PSVersion -ge [Version]'4.0') { + $host.Runspace.Debugger.SetDebugMode([System.Management.Automation.DebugModes]::RemoteScript) +} + +Function New-CoverageBreakpoint { + Param ( + [String]$Path, + [ScriptBlock]$Code, + [String]$AnsiblePath + ) + + # It is quicker to pass in the code as a string instead of calling ParseFile as we already know the contents + $predicate = { + $args[0] -is [System.Management.Automation.Language.CommandBaseAst] + } + $script_cmds = $Code.Ast.FindAll($predicate, $true) + + # Create an object that tracks the Ansible path of the file and the breakpoints that have been set in it + $info = [PSCustomObject]@{ + Path = $AnsiblePath + Breakpoints = [System.Collections.Generic.List`1[System.Management.Automation.Breakpoint]]@() + } + + # Keep track of lines that are already scanned. PowerShell can contains multiple commands in 1 line + $scanned_lines = [System.Collections.Generic.HashSet`1[System.Int32]]@() + foreach ($cmd in $script_cmds) { + if (-not $scanned_lines.Add($cmd.Extent.StartLineNumber)) { + continue + } + + # Do not add any -Action value, even if it is $null or {}. Doing so will balloon the runtime. + $params = @{ + Script = $Path + Line = $cmd.Extent.StartLineNumber + Column = $cmd.Extent.StartColumnNumber + } + $info.Breakpoints.Add((Set-PSBreakpoint @params)) + } + + $info +} + +Function Compare-PathFilterPattern { + Param ( + [String[]]$Patterns, + [String]$Path + ) + + foreach ($pattern in $Patterns) { + if ($Path -like $pattern) { + return $true + } + } + return $false +} + +$module_name = $Payload.module_args["_ansible_module_name"] +Write-AnsibleLog "INFO - building coverage payload for '$module_name'" "coverage_wrapper" + +# A PS Breakpoint needs an actual path to work properly, we create a temp directory that will store the module and +# module_util code during execution +$temp_path = Join-Path -Path ([System.IO.Path]::GetTempPath()) -ChildPath "ansible-coverage-$([System.IO.Path]::GetRandomFileName())" +Write-AnsibleLog "INFO - Creating temp path for coverage files '$temp_path'" "coverage_wrapper" +New-Item -Path $temp_path -ItemType Directory > $null +$breakpoint_info = [System.Collections.Generic.List`1[PSObject]]@() + +# Ensures we create files with UTF-8 encoding and a BOM. This is critical to force the powershell engine to read files +# as UTF-8 and not as the system's codepage. +$file_encoding = 'UTF8' + +try { + $scripts = [System.Collections.Generic.List`1[System.Object]]@($script:common_functions) + + $coverage_path_filter = $Payload.coverage.path_filter.Split(":", [StringSplitOptions]::RemoveEmptyEntries) + + # We need to track what utils have already been added to the script for loading. This is because the load + # order is important and can have module_utils that rely on other utils. + $loaded_utils = [System.Collections.Generic.HashSet`1[System.String]]@() + $parse_util = { + $util_name = $args[0] + if (-not $loaded_utils.Add($util_name)) { + return + } + + $util_code = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.powershell_modules.$util_name)) + $util_sb = [ScriptBlock]::Create($util_code) + $util_path = Join-Path -Path $temp_path -ChildPath "$($util_name).psm1" + + Write-AnsibleLog "INFO - Outputting module_util $util_name to temp file '$util_path'" "coverage_wrapper" + Set-Content -LiteralPath $util_path -Value $util_code -Encoding $file_encoding + + $ansible_path = $Payload.coverage.module_util_paths.$util_name + if ((Compare-PathFilterPattern -Patterns $coverage_path_filter -Path $ansible_path)) { + $cov_params = @{ + Path = $util_path + Code = $util_sb + AnsiblePath = $ansible_path + } + $breakpoints = New-CoverageBreakpoint @cov_params + $breakpoint_info.Add($breakpoints) + } + + if ($null -ne $util_sb.Ast.ScriptRequirements) { + foreach ($required_util in $util_sb.Ast.ScriptRequirements.RequiredModules) { + &$parse_util $required_util.Name + } + } + Write-AnsibleLog "INFO - Adding util $util_name to scripts to run" "coverage_wrapper" + $scripts.Add("Import-Module -Name '$util_path'") + } + foreach ($util in $Payload.powershell_modules.Keys) { + &$parse_util $util + } + + $module = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.module_entry)) + $module_path = Join-Path -Path $temp_path -ChildPath "$($module_name).ps1" + Write-AnsibleLog "INFO - Ouputting module $module_name to temp file '$module_path'" "coverage_wrapper" + Set-Content -LiteralPath $module_path -Value $module -Encoding $file_encoding + $scripts.Add($module_path) + + $ansible_path = $Payload.coverage.module_path + if ((Compare-PathFilterPattern -Patterns $coverage_path_filter -Path $ansible_path)) { + $cov_params = @{ + Path = $module_path + Code = [ScriptBlock]::Create($module) + AnsiblePath = $Payload.coverage.module_path + } + $breakpoints = New-CoverageBreakpoint @cov_params + $breakpoint_info.Add($breakpoints) + } + + $variables = [System.Collections.ArrayList]@(@{ Name = "complex_args"; Value = $Payload.module_args; Scope = "Global" }) + $entrypoint = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($payload.module_wrapper)) + $entrypoint = [ScriptBlock]::Create($entrypoint) + + $params = @{ + Scripts = $scripts + Variables = $variables + Environment = $Payload.environment + ModuleName = $module_name + } + if ($breakpoint_info) { + $params.Breakpoints = $breakpoint_info.Breakpoints + } + + try { + &$entrypoint @params + } + finally { + # Processing here is kept to an absolute minimum to make sure each task runtime is kept as small as + # possible. Once all the tests have been run ansible-test will collect this info and process it locally in + # one go. + Write-AnsibleLog "INFO - Creating coverage result output" "coverage_wrapper" + $coverage_info = @{} + foreach ($info in $breakpoint_info) { + $coverage_info.($info.Path) = $info.Breakpoints | Select-Object -Property Line, HitCount + } + + # The coverage.output value is a filename set by the Ansible controller. We append some more remote side + # info to the filename to make it unique and identify the remote host a bit more. + $ps_version = "$($PSVersionTable.PSVersion.Major).$($PSVersionTable.PSVersion.Minor)" + $coverage_output_path = "$($Payload.coverage.output)=powershell-$ps_version=coverage.$($env:COMPUTERNAME).$PID.$(Get-Random)" + $code_cov_json = ConvertTo-Json -InputObject $coverage_info -Compress + + Write-AnsibleLog "INFO - Outputting coverage json to '$coverage_output_path'" "coverage_wrapper" + # Ansible controller expects these files to be UTF-8 without a BOM, use .NET for this. + $utf8_no_bom = New-Object -TypeName System.Text.UTF8Encoding -ArgumentList $false + [System.IO.File]::WriteAllbytes($coverage_output_path, $utf8_no_bom.GetBytes($code_cov_json)) + } +} +finally { + try { + if ($breakpoint_info) { + foreach ($b in $breakpoint_info.Breakpoints) { + Remove-PSBreakpoint -Breakpoint $b + } + } + } + finally { + Write-AnsibleLog "INFO - Remove temp coverage folder '$temp_path'" "coverage_wrapper" + Remove-Item -LiteralPath $temp_path -Force -Recurse + } +} + +Write-AnsibleLog "INFO - ending coverage_wrapper" "coverage_wrapper" diff --git a/lib/ansible/executor/powershell/exec_wrapper.ps1 b/lib/ansible/executor/powershell/exec_wrapper.ps1 new file mode 100644 index 0000000..0f97bdf --- /dev/null +++ b/lib/ansible/executor/powershell/exec_wrapper.ps1 @@ -0,0 +1,237 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +begin { + $DebugPreference = "Continue" + $ProgressPreference = "SilentlyContinue" + $ErrorActionPreference = "Stop" + Set-StrictMode -Version 2 + + # common functions that are loaded in exec and module context, this is set + # as a script scoped variable so async_watchdog and module_wrapper can + # access the functions when creating their Runspaces + $script:common_functions = { + Function ConvertFrom-AnsibleJson { + <# + .SYNOPSIS + Converts a JSON string to a Hashtable/Array in the fastest way + possible. Unfortunately ConvertFrom-Json is still faster but outputs + a PSCustomObject which is combersone for module consumption. + + .PARAMETER InputObject + [String] The JSON string to deserialize. + #> + param( + [Parameter(Mandatory = $true, Position = 0)][String]$InputObject + ) + + # we can use -AsHashtable to get PowerShell to convert the JSON to + # a Hashtable and not a PSCustomObject. This was added in PowerShell + # 6.0, fall back to a manual conversion for older versions + $cmdlet = Get-Command -Name ConvertFrom-Json -CommandType Cmdlet + if ("AsHashtable" -in $cmdlet.Parameters.Keys) { + return , (ConvertFrom-Json -InputObject $InputObject -AsHashtable) + } + else { + # get the PSCustomObject and then manually convert from there + $raw_obj = ConvertFrom-Json -InputObject $InputObject + + Function ConvertTo-Hashtable { + param($InputObject) + + if ($null -eq $InputObject) { + return $null + } + + if ($InputObject -is [PSCustomObject]) { + $new_value = @{} + foreach ($prop in $InputObject.PSObject.Properties.GetEnumerator()) { + $new_value.($prop.Name) = (ConvertTo-Hashtable -InputObject $prop.Value) + } + return , $new_value + } + elseif ($InputObject -is [Array]) { + $new_value = [System.Collections.ArrayList]@() + foreach ($val in $InputObject) { + $new_value.Add((ConvertTo-Hashtable -InputObject $val)) > $null + } + return , $new_value.ToArray() + } + else { + return , $InputObject + } + } + return , (ConvertTo-Hashtable -InputObject $raw_obj) + } + } + + Function Format-AnsibleException { + <# + .SYNOPSIS + Formats a PowerShell ErrorRecord to a string that's fit for human + consumption. + + .NOTES + Using Out-String can give us the first part of the exception but it + also wraps the messages at 80 chars which is not ideal. We also + append the ScriptStackTrace and the .NET StackTrace if present. + #> + param([System.Management.Automation.ErrorRecord]$ErrorRecord) + + $exception = @" +$($ErrorRecord.ToString()) +$($ErrorRecord.InvocationInfo.PositionMessage) + + CategoryInfo : $($ErrorRecord.CategoryInfo.ToString()) + + FullyQualifiedErrorId : $($ErrorRecord.FullyQualifiedErrorId.ToString()) +"@ + # module_common strip comments and empty newlines, need to manually + # add a preceding newline using `r`n + $exception += "`r`n`r`nScriptStackTrace:`r`n$($ErrorRecord.ScriptStackTrace)`r`n" + + # exceptions from C# will also have a StackTrace which we + # append if found + if ($null -ne $ErrorRecord.Exception.StackTrace) { + $exception += "`r`n$($ErrorRecord.Exception.ToString())" + } + + return $exception + } + } + .$common_functions + + # common wrapper functions used in the exec wrappers, this is defined in a + # script scoped variable so async_watchdog can pass them into the async job + $script:wrapper_functions = { + Function Write-AnsibleError { + <# + .SYNOPSIS + Writes an error message to a JSON string in the format that Ansible + understands. Also optionally adds an exception record if the + ErrorRecord is passed through. + #> + param( + [Parameter(Mandatory = $true)][String]$Message, + [System.Management.Automation.ErrorRecord]$ErrorRecord = $null + ) + $result = @{ + msg = $Message + failed = $true + } + if ($null -ne $ErrorRecord) { + $result.msg += ": $($ErrorRecord.Exception.Message)" + $result.exception = (Format-AnsibleException -ErrorRecord $ErrorRecord) + } + Write-Output -InputObject (ConvertTo-Json -InputObject $result -Depth 99 -Compress) + } + + Function Write-AnsibleLog { + <# + .SYNOPSIS + Used as a debugging tool to log events to a file as they run in the + exec wrappers. By default this is a noop function but the $log_path + can be manually set to enable it. Manually set ANSIBLE_EXEC_DEBUG as + an env value on the Windows host that this is run on to enable. + #> + param( + [Parameter(Mandatory = $true, Position = 0)][String]$Message, + [Parameter(Position = 1)][String]$Wrapper + ) + + $log_path = $env:ANSIBLE_EXEC_DEBUG + if ($log_path) { + $log_path = [System.Environment]::ExpandEnvironmentVariables($log_path) + $parent_path = [System.IO.Path]::GetDirectoryName($log_path) + if (Test-Path -LiteralPath $parent_path -PathType Container) { + $msg = "{0:u} - {1} - {2} - " -f (Get-Date), $pid, ([System.Security.Principal.WindowsIdentity]::GetCurrent().Name) + if ($null -ne $Wrapper) { + $msg += "$Wrapper - " + } + $msg += $Message + "`r`n" + $msg_bytes = [System.Text.Encoding]::UTF8.GetBytes($msg) + + $fs = [System.IO.File]::Open($log_path, [System.IO.FileMode]::Append, + [System.IO.FileAccess]::Write, [System.IO.FileShare]::ReadWrite) + try { + $fs.Write($msg_bytes, 0, $msg_bytes.Length) + } + finally { + $fs.Close() + } + } + } + } + } + .$wrapper_functions + + # only init and stream in $json_raw if it wasn't set by the enclosing scope + if (-not $(Get-Variable "json_raw" -ErrorAction SilentlyContinue)) { + $json_raw = '' + } +} process { + $json_raw += [String]$input +} end { + Write-AnsibleLog "INFO - starting exec_wrapper" "exec_wrapper" + if (-not $json_raw) { + Write-AnsibleError -Message "internal error: no input given to PowerShell exec wrapper" + exit 1 + } + + Write-AnsibleLog "INFO - converting json raw to a payload" "exec_wrapper" + $payload = ConvertFrom-AnsibleJson -InputObject $json_raw + + # TODO: handle binary modules + # TODO: handle persistence + + if ($payload.min_os_version) { + $min_os_version = [Version]$payload.min_os_version + # Environment.OSVersion.Version is deprecated and may not return the + # right version + $actual_os_version = [Version](Get-Item -Path $env:SystemRoot\System32\kernel32.dll).VersionInfo.ProductVersion + + Write-AnsibleLog "INFO - checking if actual os version '$actual_os_version' is less than the min os version '$min_os_version'" "exec_wrapper" + if ($actual_os_version -lt $min_os_version) { + $msg = "internal error: This module cannot run on this OS as it requires a minimum version of $min_os_version, actual was $actual_os_version" + Write-AnsibleError -Message $msg + exit 1 + } + } + if ($payload.min_ps_version) { + $min_ps_version = [Version]$payload.min_ps_version + $actual_ps_version = $PSVersionTable.PSVersion + + Write-AnsibleLog "INFO - checking if actual PS version '$actual_ps_version' is less than the min PS version '$min_ps_version'" "exec_wrapper" + if ($actual_ps_version -lt $min_ps_version) { + $msg = "internal error: This module cannot run as it requires a minimum PowerShell version of $min_ps_version, actual was $actual_ps_version" + Write-AnsibleError -Message $msg + exit 1 + } + } + + # pop 0th action as entrypoint + $action = $payload.actions[0] + Write-AnsibleLog "INFO - running action $action" "exec_wrapper" + + $entrypoint = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($payload.($action))) + $entrypoint = [ScriptBlock]::Create($entrypoint) + # so we preserve the formatting and don't fall prey to locale issues, some + # wrappers want the output to be in base64 form, we store the value here in + # case the wrapper changes the value when they create a payload for their + # own exec_wrapper + $encoded_output = $payload.encoded_output + + try { + $output = &$entrypoint -Payload $payload + if ($encoded_output -and $null -ne $output) { + $b64_output = [System.Convert]::ToBase64String([System.Text.Encoding]::UTF8.GetBytes($output)) + Write-Output -InputObject $b64_output + } + else { + $output + } + } + catch { + Write-AnsibleError -Message "internal error: failed to run exec_wrapper action $action" -ErrorRecord $_ + exit 1 + } + Write-AnsibleLog "INFO - ending exec_wrapper" "exec_wrapper" +} diff --git a/lib/ansible/executor/powershell/module_manifest.py b/lib/ansible/executor/powershell/module_manifest.py new file mode 100644 index 0000000..970e848 --- /dev/null +++ b/lib/ansible/executor/powershell/module_manifest.py @@ -0,0 +1,402 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import base64 +import errno +import json +import os +import pkgutil +import random +import re + +from ansible.module_utils.compat.version import LooseVersion + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils.compat.importlib import import_module +from ansible.plugins.loader import ps_module_utils_loader +from ansible.utils.collection_loader import resource_from_fqcr + + +class PSModuleDepFinder(object): + + def __init__(self): + # This is also used by validate-modules to get a module's required utils in base and a collection. + self.ps_modules = dict() + self.exec_scripts = dict() + + # by defining an explicit dict of cs utils and where they are used, we + # can potentially save time by not adding the type multiple times if it + # isn't needed + self.cs_utils_wrapper = dict() + self.cs_utils_module = dict() + + self.ps_version = None + self.os_version = None + self.become = False + + self._re_cs_module = [ + # Reference C# module_util in another C# util, this must always be the fully qualified name. + # 'using ansible_collections.{namespace}.{collection}.plugins.module_utils.{name}' + re.compile(to_bytes(r'(?i)^using\s((Ansible\..+)|' + r'(ansible_collections\.\w+\.\w+\.plugins\.module_utils\.[\w\.]+));\s*$')), + ] + + self._re_cs_in_ps_module = [ + # Reference C# module_util in a PowerShell module + # '#AnsibleRequires -CSharpUtil Ansible.{name}' + # '#AnsibleRequires -CSharpUtil ansible_collections.{namespace}.{collection}.plugins.module_utils.{name}' + # '#AnsibleRequires -CSharpUtil ..module_utils.{name}' + # Can have '-Optional' at the end to denote the util is optional + re.compile(to_bytes(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((Ansible\.[\w\.]+)|' + r'(ansible_collections\.\w+\.\w+\.plugins\.module_utils\.[\w\.]+)|' + r'(\.[\w\.]+))(?P<optional>\s+-Optional){0,1}')), + ] + + self._re_ps_module = [ + # Original way of referencing a builtin module_util + # '#Requires -Module Ansible.ModuleUtils.{name} + re.compile(to_bytes(r'(?i)^#\s*requires\s+\-module(?:s?)\s*(Ansible\.ModuleUtils\..+)')), + # New way of referencing a builtin and collection module_util + # '#AnsibleRequires -PowerShell Ansible.ModuleUtils.{name}' + # '#AnsibleRequires -PowerShell ansible_collections.{namespace}.{collection}.plugins.module_utils.{name}' + # '#AnsibleRequires -PowerShell ..module_utils.{name}' + # Can have '-Optional' at the end to denote the util is optional + re.compile(to_bytes(r'(?i)^#\s*ansiblerequires\s+-powershell\s+((Ansible\.ModuleUtils\.[\w\.]+)|' + r'(ansible_collections\.\w+\.\w+\.plugins\.module_utils\.[\w\.]+)|' + r'(\.[\w\.]+))(?P<optional>\s+-Optional){0,1}')), + ] + + self._re_wrapper = re.compile(to_bytes(r'(?i)^#\s*ansiblerequires\s+-wrapper\s+(\w*)')) + self._re_ps_version = re.compile(to_bytes(r'(?i)^#requires\s+\-version\s+([0-9]+(\.[0-9]+){0,3})$')) + self._re_os_version = re.compile(to_bytes(r'(?i)^#ansiblerequires\s+\-osversion\s+([0-9]+(\.[0-9]+){0,3})$')) + self._re_become = re.compile(to_bytes(r'(?i)^#ansiblerequires\s+\-become$')) + + def scan_module(self, module_data, fqn=None, wrapper=False, powershell=True): + lines = module_data.split(b'\n') + module_utils = set() + if wrapper: + cs_utils = self.cs_utils_wrapper + else: + cs_utils = self.cs_utils_module + + if powershell: + checks = [ + # PS module contains '#Requires -Module Ansible.ModuleUtils.*' + # PS module contains '#AnsibleRequires -Powershell Ansible.*' (or collections module_utils ref) + (self._re_ps_module, self.ps_modules, ".psm1"), + # PS module contains '#AnsibleRequires -CSharpUtil Ansible.*' (or collections module_utils ref) + (self._re_cs_in_ps_module, cs_utils, ".cs"), + ] + else: + checks = [ + # CS module contains 'using Ansible.*;' or 'using ansible_collections.ns.coll.plugins.module_utils.*;' + (self._re_cs_module, cs_utils, ".cs"), + ] + + for line in lines: + for check in checks: + for pattern in check[0]: + match = pattern.match(line) + if match: + # tolerate windows line endings by stripping any remaining + # newline chars + module_util_name = to_text(match.group(1).rstrip()) + match_dict = match.groupdict() + optional = match_dict.get('optional', None) is not None + + if module_util_name not in check[1].keys(): + module_utils.add((module_util_name, check[2], fqn, optional)) + + break + + if powershell: + ps_version_match = self._re_ps_version.match(line) + if ps_version_match: + self._parse_version_match(ps_version_match, "ps_version") + + os_version_match = self._re_os_version.match(line) + if os_version_match: + self._parse_version_match(os_version_match, "os_version") + + # once become is set, no need to keep on checking recursively + if not self.become: + become_match = self._re_become.match(line) + if become_match: + self.become = True + + if wrapper: + wrapper_match = self._re_wrapper.match(line) + if wrapper_match: + self.scan_exec_script(wrapper_match.group(1).rstrip()) + + # recursively drill into each Requires to see if there are any more + # requirements + for m in set(module_utils): + self._add_module(*m, wrapper=wrapper) + + def scan_exec_script(self, name): + # scans lib/ansible/executor/powershell for scripts used in the module + # exec side. It also scans these scripts for any dependencies + name = to_text(name) + if name in self.exec_scripts.keys(): + return + + data = pkgutil.get_data("ansible.executor.powershell", to_native(name + ".ps1")) + if data is None: + raise AnsibleError("Could not find executor powershell script " + "for '%s'" % name) + + b_data = to_bytes(data) + + # remove comments to reduce the payload size in the exec wrappers + if C.DEFAULT_DEBUG: + exec_script = b_data + else: + exec_script = _strip_comments(b_data) + self.exec_scripts[name] = to_bytes(exec_script) + self.scan_module(b_data, wrapper=True, powershell=True) + + def _add_module(self, name, ext, fqn, optional, wrapper=False): + m = to_text(name) + + util_fqn = None + + if m.startswith("Ansible."): + # Builtin util, use plugin loader to get the data + mu_path = ps_module_utils_loader.find_plugin(m, ext) + + if not mu_path: + if optional: + return + + raise AnsibleError('Could not find imported module support code ' + 'for \'%s\'' % m) + + module_util_data = to_bytes(_slurp(mu_path)) + else: + # Collection util, load the package data based on the util import. + + submodules = m.split(".") + if m.startswith('.'): + fqn_submodules = fqn.split('.') + for submodule in submodules: + if submodule: + break + del fqn_submodules[-1] + + submodules = fqn_submodules + [s for s in submodules if s] + + n_package_name = to_native('.'.join(submodules[:-1]), errors='surrogate_or_strict') + n_resource_name = to_native(submodules[-1] + ext, errors='surrogate_or_strict') + + try: + module_util = import_module(n_package_name) + pkg_data = pkgutil.get_data(n_package_name, n_resource_name) + if pkg_data is None: + raise ImportError("No package data found") + + module_util_data = to_bytes(pkg_data, errors='surrogate_or_strict') + util_fqn = to_text("%s.%s " % (n_package_name, submodules[-1]), errors='surrogate_or_strict') + + # Get the path of the util which is required for coverage collection. + resource_paths = list(module_util.__path__) + if len(resource_paths) != 1: + # This should never happen with a collection but we are just being defensive about it. + raise AnsibleError("Internal error: Referenced module_util package '%s' contains 0 or multiple " + "import locations when we only expect 1." % n_package_name) + mu_path = os.path.join(resource_paths[0], n_resource_name) + except (ImportError, OSError) as err: + if getattr(err, "errno", errno.ENOENT) == errno.ENOENT: + if optional: + return + + raise AnsibleError('Could not find collection imported module support code for \'%s\'' + % to_native(m)) + + else: + raise + + util_info = { + 'data': module_util_data, + 'path': to_text(mu_path), + } + if ext == ".psm1": + self.ps_modules[m] = util_info + else: + if wrapper: + self.cs_utils_wrapper[m] = util_info + else: + self.cs_utils_module[m] = util_info + self.scan_module(module_util_data, fqn=util_fqn, wrapper=wrapper, powershell=(ext == ".psm1")) + + def _parse_version_match(self, match, attribute): + new_version = to_text(match.group(1)).rstrip() + + # PowerShell cannot cast a string of "1" to Version, it must have at + # least the major.minor for it to be valid so we append 0 + if match.group(2) is None: + new_version = "%s.0" % new_version + + existing_version = getattr(self, attribute, None) + if existing_version is None: + setattr(self, attribute, new_version) + else: + # determine which is the latest version and set that + if LooseVersion(new_version) > LooseVersion(existing_version): + setattr(self, attribute, new_version) + + +def _slurp(path): + if not os.path.exists(path): + raise AnsibleError("imported module support code does not exist at %s" + % os.path.abspath(path)) + fd = open(path, 'rb') + data = fd.read() + fd.close() + return data + + +def _strip_comments(source): + # Strip comments and blank lines from the wrapper + buf = [] + start_block = False + for line in source.splitlines(): + l = line.strip() + + if start_block and l.endswith(b'#>'): + start_block = False + continue + elif start_block: + continue + elif l.startswith(b'<#'): + start_block = True + continue + elif not l or l.startswith(b'#'): + continue + + buf.append(line) + return b'\n'.join(buf) + + +def _create_powershell_wrapper(b_module_data, module_path, module_args, + environment, async_timeout, become, + become_method, become_user, become_password, + become_flags, substyle, task_vars, module_fqn): + # creates the manifest/wrapper used in PowerShell/C# modules to enable + # things like become and async - this is also called in action/script.py + + # FUTURE: add process_wrapper.ps1 to run module_wrapper in a new process + # if running under a persistent connection and substyle is C# so we + # don't have type conflicts + finder = PSModuleDepFinder() + if substyle != 'script': + # don't scan the module for util dependencies and other Ansible related + # flags if the substyle is 'script' which is set by action/script + finder.scan_module(b_module_data, fqn=module_fqn, powershell=(substyle == "powershell")) + + module_wrapper = "module_%s_wrapper" % substyle + exec_manifest = dict( + module_entry=to_text(base64.b64encode(b_module_data)), + powershell_modules=dict(), + csharp_utils=dict(), + csharp_utils_module=list(), # csharp_utils only required by a module + module_args=module_args, + actions=[module_wrapper], + environment=environment, + encoded_output=False, + ) + finder.scan_exec_script(module_wrapper) + + if async_timeout > 0: + finder.scan_exec_script('exec_wrapper') + finder.scan_exec_script('async_watchdog') + finder.scan_exec_script('async_wrapper') + + exec_manifest["actions"].insert(0, 'async_watchdog') + exec_manifest["actions"].insert(0, 'async_wrapper') + exec_manifest["async_jid"] = str(random.randint(0, 999999999999)) + exec_manifest["async_timeout_sec"] = async_timeout + exec_manifest["async_startup_timeout"] = C.config.get_config_value("WIN_ASYNC_STARTUP_TIMEOUT", variables=task_vars) + + if become and resource_from_fqcr(become_method) == 'runas': # runas and namespace.collection.runas + finder.scan_exec_script('exec_wrapper') + finder.scan_exec_script('become_wrapper') + + exec_manifest["actions"].insert(0, 'become_wrapper') + exec_manifest["become_user"] = become_user + exec_manifest["become_password"] = become_password + exec_manifest['become_flags'] = become_flags + + exec_manifest['min_ps_version'] = finder.ps_version + exec_manifest['min_os_version'] = finder.os_version + if finder.become and 'become_wrapper' not in exec_manifest['actions']: + finder.scan_exec_script('exec_wrapper') + finder.scan_exec_script('become_wrapper') + + exec_manifest['actions'].insert(0, 'become_wrapper') + exec_manifest['become_user'] = 'SYSTEM' + exec_manifest['become_password'] = None + exec_manifest['become_flags'] = None + + coverage_manifest = dict( + module_path=module_path, + module_util_paths=dict(), + output=None, + ) + coverage_output = C.config.get_config_value('COVERAGE_REMOTE_OUTPUT', variables=task_vars) + if coverage_output and substyle == 'powershell': + finder.scan_exec_script('coverage_wrapper') + coverage_manifest['output'] = coverage_output + + coverage_enabled = C.config.get_config_value('COVERAGE_REMOTE_PATHS', variables=task_vars) + coverage_manifest['path_filter'] = coverage_enabled + + # make sure Ansible.ModuleUtils.AddType is added if any C# utils are used + if len(finder.cs_utils_wrapper) > 0 or len(finder.cs_utils_module) > 0: + finder._add_module(b"Ansible.ModuleUtils.AddType", ".psm1", None, False, + wrapper=False) + + # exec_wrapper is only required to be part of the payload if using + # become or async, to save on payload space we check if exec_wrapper has + # already been added, and remove it manually if it hasn't later + exec_required = "exec_wrapper" in finder.exec_scripts.keys() + finder.scan_exec_script("exec_wrapper") + # must contain an empty newline so it runs the begin/process/end block + finder.exec_scripts["exec_wrapper"] += b"\n\n" + + exec_wrapper = finder.exec_scripts["exec_wrapper"] + if not exec_required: + finder.exec_scripts.pop("exec_wrapper") + + for name, data in finder.exec_scripts.items(): + b64_data = to_text(base64.b64encode(data)) + exec_manifest[name] = b64_data + + for name, data in finder.ps_modules.items(): + b64_data = to_text(base64.b64encode(data['data'])) + exec_manifest['powershell_modules'][name] = b64_data + coverage_manifest['module_util_paths'][name] = data['path'] + + cs_utils = {} + for cs_util in [finder.cs_utils_wrapper, finder.cs_utils_module]: + for name, data in cs_util.items(): + cs_utils[name] = data['data'] + + for name, data in cs_utils.items(): + b64_data = to_text(base64.b64encode(data)) + exec_manifest['csharp_utils'][name] = b64_data + exec_manifest['csharp_utils_module'] = list(finder.cs_utils_module.keys()) + + # To save on the data we are sending across we only add the coverage info if coverage is being run + if 'coverage_wrapper' in exec_manifest: + exec_manifest['coverage'] = coverage_manifest + + b_json = to_bytes(json.dumps(exec_manifest)) + # delimit the payload JSON from the wrapper to keep sensitive contents out of scriptblocks (which can be logged) + b_data = exec_wrapper + b'\0\0\0\0' + b_json + return b_data diff --git a/lib/ansible/executor/powershell/module_powershell_wrapper.ps1 b/lib/ansible/executor/powershell/module_powershell_wrapper.ps1 new file mode 100644 index 0000000..c35c84c --- /dev/null +++ b/lib/ansible/executor/powershell/module_powershell_wrapper.ps1 @@ -0,0 +1,75 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +#AnsibleRequires -Wrapper module_wrapper + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting module_powershell_wrapper" "module_powershell_wrapper" + +$module_name = $Payload.module_args["_ansible_module_name"] +Write-AnsibleLog "INFO - building module payload for '$module_name'" "module_powershell_wrapper" + +# compile any C# module utils passed in from the controller, Add-CSharpType is +# automatically added to the payload manifest if any csharp util is set +$csharp_utils = [System.Collections.ArrayList]@() +foreach ($csharp_util in $Payload.csharp_utils_module) { + Write-AnsibleLog "INFO - adding $csharp_util to list of C# references to compile" "module_powershell_wrapper" + $util_code = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.csharp_utils[$csharp_util])) + $csharp_utils.Add($util_code) > $null +} +if ($csharp_utils.Count -gt 0) { + $add_type_b64 = $Payload.powershell_modules["Ansible.ModuleUtils.AddType"] + $add_type = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($add_type_b64)) + New-Module -Name Ansible.ModuleUtils.AddType -ScriptBlock ([ScriptBlock]::Create($add_type)) | Import-Module > $null + + # add any C# references so the module does not have to do so + $new_tmp = [System.Environment]::ExpandEnvironmentVariables($Payload.module_args["_ansible_remote_tmp"]) + Add-CSharpType -References $csharp_utils -TempPath $new_tmp -IncludeDebugInfo +} + +if ($Payload.ContainsKey("coverage") -and $null -ne $host.Runspace -and $null -ne $host.Runspace.Debugger) { + $entrypoint = $payload.coverage_wrapper + + $params = @{ + Payload = $Payload + } +} +else { + # get the common module_wrapper code and invoke that to run the module + $module = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.module_entry)) + $variables = [System.Collections.ArrayList]@(@{ Name = "complex_args"; Value = $Payload.module_args; Scope = "Global" }) + $entrypoint = $Payload.module_wrapper + + $params = @{ + Scripts = @($script:common_functions, $module) + Variables = $variables + Environment = $Payload.environment + Modules = $Payload.powershell_modules + ModuleName = $module_name + } +} + +$entrypoint = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($entrypoint)) +$entrypoint = [ScriptBlock]::Create($entrypoint) + +try { + &$entrypoint @params +} +catch { + # failed to invoke the PowerShell module, capture the exception and + # output a pretty error for Ansible to parse + $result = @{ + msg = "Failed to invoke PowerShell module: $($_.Exception.Message)" + failed = $true + exception = (Format-AnsibleException -ErrorRecord $_) + } + Write-Output -InputObject (ConvertTo-Json -InputObject $result -Depth 99 -Compress) + $host.SetShouldExit(1) +} + +Write-AnsibleLog "INFO - ending module_powershell_wrapper" "module_powershell_wrapper" diff --git a/lib/ansible/executor/powershell/module_script_wrapper.ps1 b/lib/ansible/executor/powershell/module_script_wrapper.ps1 new file mode 100644 index 0000000..dd8420f --- /dev/null +++ b/lib/ansible/executor/powershell/module_script_wrapper.ps1 @@ -0,0 +1,22 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +param( + [Parameter(Mandatory = $true)][System.Collections.IDictionary]$Payload +) + +#AnsibleRequires -Wrapper module_wrapper + +$ErrorActionPreference = "Stop" + +Write-AnsibleLog "INFO - starting module_script_wrapper" "module_script_wrapper" + +$script = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($Payload.module_entry)) + +# get the common module_wrapper code and invoke that to run the module +$entrypoint = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($payload.module_wrapper)) +$entrypoint = [ScriptBlock]::Create($entrypoint) + +&$entrypoint -Scripts $script -Environment $Payload.environment -ModuleName "script" + +Write-AnsibleLog "INFO - ending module_script_wrapper" "module_script_wrapper" diff --git a/lib/ansible/executor/powershell/module_wrapper.ps1 b/lib/ansible/executor/powershell/module_wrapper.ps1 new file mode 100644 index 0000000..20a9677 --- /dev/null +++ b/lib/ansible/executor/powershell/module_wrapper.ps1 @@ -0,0 +1,226 @@ +# (c) 2018 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +<# +.SYNOPSIS +Invokes an Ansible module in a new Runspace. This cmdlet will output the +module's output and write any errors to the error stream of the current +host. + +.PARAMETER Scripts +[Object[]] String or ScriptBlocks to execute. + +.PARAMETER Variables +[System.Collections.ArrayList] The variables to set in the new Pipeline. +Each value is a hashtable that contains the parameters to use with +Set-Variable; + Name: the name of the variable to set + Value: the value of the variable to set + Scope: the scope of the variable + +.PARAMETER Environment +[System.Collections.IDictionary] A Dictionary of environment key/values to +set in the new Pipeline. + +.PARAMETER Modules +[System.Collections.IDictionary] A Dictionary of PowerShell modules to +import into the new Pipeline. The key is the name of the module and the +value is a base64 string of the module util code. + +.PARAMETER ModuleName +[String] The name of the module that is being executed. + +.PARAMETER Breakpoints +A list of line breakpoints to add to the runspace debugger. This is used to +track module and module_utils coverage. +#> +param( + [Object[]]$Scripts, + [System.Collections.ArrayList][AllowEmptyCollection()]$Variables, + [System.Collections.IDictionary]$Environment, + [System.Collections.IDictionary]$Modules, + [String]$ModuleName, + [System.Management.Automation.LineBreakpoint[]]$Breakpoints = @() +) + +Write-AnsibleLog "INFO - creating new PowerShell pipeline for $ModuleName" "module_wrapper" +$ps = [PowerShell]::Create() + +# do not set ErrorActionPreference for script +if ($ModuleName -ne "script") { + $ps.Runspace.SessionStateProxy.SetVariable("ErrorActionPreference", "Stop") +} + +# force input encoding to preamble-free UTF8 so PS sub-processes (eg, +# Start-Job) don't blow up. This is only required for WinRM, a PSRP +# runspace doesn't have a host console and this will bomb out +if ($host.Name -eq "ConsoleHost") { + Write-AnsibleLog "INFO - setting console input encoding to UTF8 for $ModuleName" "module_wrapper" + $ps.AddScript('[Console]::InputEncoding = New-Object Text.UTF8Encoding $false').AddStatement() > $null +} + +# set the variables +foreach ($variable in $Variables) { + Write-AnsibleLog "INFO - setting variable '$($variable.Name)' for $ModuleName" "module_wrapper" + $ps.AddCommand("Set-Variable").AddParameters($variable).AddStatement() > $null +} + +# set the environment vars +if ($Environment) { + # Escaping quotes can be problematic, instead just pass the string to the runspace and set it directly. + Write-AnsibleLog "INFO - setting environment vars for $ModuleName" "module_wrapper" + $ps.Runspace.SessionStateProxy.SetVariable("_AnsibleEnvironment", $Environment) + $ps.AddScript(@' +foreach ($env_kv in $_AnsibleEnvironment.GetEnumerator()) { + [System.Environment]::SetEnvironmentVariable($env_kv.Key, $env_kv.Value) +} +'@).AddStatement() > $null +} + +# import the PS modules +if ($Modules) { + foreach ($module in $Modules.GetEnumerator()) { + Write-AnsibleLog "INFO - create module util '$($module.Key)' for $ModuleName" "module_wrapper" + $module_name = $module.Key + $module_code = [System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String($module.Value)) + $ps.AddCommand("New-Module").AddParameters(@{Name = $module_name; ScriptBlock = [ScriptBlock]::Create($module_code) }) > $null + $ps.AddCommand("Import-Module").AddParameter("WarningAction", "SilentlyContinue") > $null + $ps.AddCommand("Out-Null").AddStatement() > $null + } +} + +# redefine Write-Host to dump to output instead of failing +# lots of scripts still use it +$ps.AddScript('Function Write-Host($msg) { Write-Output -InputObject $msg }').AddStatement() > $null + +# add the scripts and run +foreach ($script in $Scripts) { + $ps.AddScript($script).AddStatement() > $null +} + +if ($Breakpoints.Count -gt 0) { + Write-AnsibleLog "INFO - adding breakpoint to runspace that will run the modules" "module_wrapper" + if ($PSVersionTable.PSVersion.Major -eq 3) { + # The SetBreakpoints method was only added in PowerShell v4+. We need to rely on a private method to + # achieve the same functionality in this older PowerShell version. This should be removed once we drop + # support for PowerShell v3. + $set_method = $ps.Runspace.Debugger.GetType().GetMethod( + 'AddLineBreakpoint', [System.Reflection.BindingFlags]'Instance, NonPublic' + ) + foreach ($b in $Breakpoints) { + $set_method.Invoke($ps.Runspace.Debugger, [Object[]]@(, $b)) > $null + } + } + else { + $ps.Runspace.Debugger.SetBreakpoints($Breakpoints) + } +} + +Write-AnsibleLog "INFO - start module exec with Invoke() - $ModuleName" "module_wrapper" + +# temporarily override the stdout stream and create our own in a StringBuilder +# we use this to ensure there's always an Out pipe and that we capture the +# output for things like async or psrp +$orig_out = [System.Console]::Out +$sb = New-Object -TypeName System.Text.StringBuilder +$new_out = New-Object -TypeName System.IO.StringWriter -ArgumentList $sb +try { + [System.Console]::SetOut($new_out) + $module_output = $ps.Invoke() +} +catch { + # uncaught exception while executing module, present a prettier error for + # Ansible to parse + $error_params = @{ + Message = "Unhandled exception while executing module" + ErrorRecord = $_ + } + + # Be more defensive when trying to find the InnerException in case it isn't + # set. This shouldn't ever be the case but if it is then it makes it more + # difficult to track down the problem. + if ($_.Exception.PSObject.Properties.Name -contains "InnerException") { + $inner_exception = $_.Exception.InnerException + if ($inner_exception.PSObject.Properties.Name -contains "ErrorRecord") { + $error_params.ErrorRecord = $inner_exception.ErrorRecord + } + } + + Write-AnsibleError @error_params + $host.SetShouldExit(1) + return +} +finally { + [System.Console]::SetOut($orig_out) + $new_out.Dispose() +} + +# other types of errors may not throw an exception in Invoke but rather just +# set the pipeline state to failed +if ($ps.InvocationStateInfo.State -eq "Failed" -and $ModuleName -ne "script") { + $reason = $ps.InvocationStateInfo.Reason + $error_params = @{ + Message = "Unhandled exception while executing module" + } + + # The error record should always be set on the reason but this does not + # always happen on Server 2008 R2 for some reason (probably memory hotfix). + # Be defensive when trying to get the error record and fall back to other + # options. + if ($null -eq $reason) { + $error_params.Message += ": Unknown error" + } + elseif ($reason.PSObject.Properties.Name -contains "ErrorRecord") { + $error_params.ErrorRecord = $reason.ErrorRecord + } + else { + $error_params.Message += ": $($reason.ToString())" + } + + Write-AnsibleError @error_params + $host.SetShouldExit(1) + return +} + +Write-AnsibleLog "INFO - module exec ended $ModuleName" "module_wrapper" +$stdout = $sb.ToString() +if ($stdout) { + Write-Output -InputObject $stdout +} +if ($module_output.Count -gt 0) { + # do not output if empty collection + Write-AnsibleLog "INFO - using the output stream for module output - $ModuleName" "module_wrapper" + Write-Output -InputObject ($module_output -join "`r`n") +} + +# we attempt to get the return code from the LASTEXITCODE variable +# this is set explicitly in newer style variables when calling +# ExitJson and FailJson. If set we set the current hosts' exit code +# to that same value +$rc = $ps.Runspace.SessionStateProxy.GetVariable("LASTEXITCODE") +if ($null -ne $rc) { + Write-AnsibleLog "INFO - got an rc of $rc from $ModuleName exec" "module_wrapper" + $host.SetShouldExit($rc) +} + +# PS3 doesn't properly set HadErrors in many cases, inspect the error stream as a fallback +# with the trap handler that's now in place, this should only write to the output if +# $ErrorActionPreference != "Stop", that's ok because this is sent to the stderr output +# for a user to manually debug if something went horribly wrong +if ($ps.HadErrors -or ($PSVersionTable.PSVersion.Major -lt 4 -and $ps.Streams.Error.Count -gt 0)) { + Write-AnsibleLog "WARN - module had errors, outputting error info $ModuleName" "module_wrapper" + # if the rc wasn't explicitly set, we return an exit code of 1 + if ($null -eq $rc) { + $host.SetShouldExit(1) + } + + # output each error to the error stream of the current pipeline + foreach ($err in $ps.Streams.Error) { + $error_msg = Format-AnsibleException -ErrorRecord $err + + # need to use the current hosts's UI class as we may not have + # a console to write the stderr to, e.g. psrp + Write-AnsibleLog "WARN - error msg for for $($ModuleName):`r`n$error_msg" "module_wrapper" + $host.UI.WriteErrorLine($error_msg) + } +} diff --git a/lib/ansible/executor/process/__init__.py b/lib/ansible/executor/process/__init__.py new file mode 100644 index 0000000..ae8ccff --- /dev/null +++ b/lib/ansible/executor/process/__init__.py @@ -0,0 +1,20 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py new file mode 100644 index 0000000..5113b83 --- /dev/null +++ b/lib/ansible/executor/process/worker.py @@ -0,0 +1,226 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import sys +import traceback + +from jinja2.exceptions import TemplateNotFound + +from ansible.errors import AnsibleConnectionFailure +from ansible.executor.task_executor import TaskExecutor +from ansible.module_utils._text import to_text +from ansible.utils.display import Display +from ansible.utils.multiprocessing import context as multiprocessing_context + +__all__ = ['WorkerProcess'] + +display = Display() + + +class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defined] + ''' + The worker thread class, which uses TaskExecutor to run tasks + read from a job queue and pushes results into a results queue + for reading later. + ''' + + def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): + + super(WorkerProcess, self).__init__() + # takes a task queue manager as the sole param: + self._final_q = final_q + self._task_vars = task_vars + self._host = host + self._task = task + self._play_context = play_context + self._loader = loader + self._variable_manager = variable_manager + self._shared_loader_obj = shared_loader_obj + + # NOTE: this works due to fork, if switching to threads this should change to per thread storage of temp files + # clear var to ensure we only delete files for this child + self._loader._tempfiles = set() + + def _save_stdin(self): + self._new_stdin = None + try: + if sys.stdin.isatty() and sys.stdin.fileno() is not None: + try: + self._new_stdin = os.fdopen(os.dup(sys.stdin.fileno())) + except OSError: + # couldn't dupe stdin, most likely because it's + # not a valid file descriptor + pass + except (AttributeError, ValueError): + # couldn't get stdin's fileno + pass + + if self._new_stdin is None: + self._new_stdin = open(os.devnull) + + def start(self): + ''' + multiprocessing.Process replaces the worker's stdin with a new file + but we wish to preserve it if it is connected to a terminal. + Therefore dup a copy prior to calling the real start(), + ensuring the descriptor is preserved somewhere in the new child, and + make sure it is closed in the parent when start() completes. + ''' + + self._save_stdin() + # FUTURE: this lock can be removed once a more generalized pre-fork thread pause is in place + with display._lock: + try: + return super(WorkerProcess, self).start() + finally: + self._new_stdin.close() + + def _hard_exit(self, e): + ''' + There is no safe exception to return to higher level code that does not + risk an innocent try/except finding itself executing in the wrong + process. All code executing above WorkerProcess.run() on the stack + conceptually belongs to another program. + ''' + + try: + display.debug(u"WORKER HARD EXIT: %s" % to_text(e)) + except BaseException: + # If the cause of the fault is IOError being generated by stdio, + # attempting to log a debug message may trigger another IOError. + # Try printing once then give up. + pass + + os._exit(1) + + def run(self): + ''' + Wrap _run() to ensure no possibility an errant exception can cause + control to return to the StrategyBase task loop, or any other code + higher in the stack. + + As multiprocessing in Python 2.x provides no protection, it is possible + a try/except added in far-away code can cause a crashed child process + to suddenly assume the role and prior state of its parent. + ''' + try: + return self._run() + except BaseException as e: + self._hard_exit(e) + finally: + # This is a hack, pure and simple, to work around a potential deadlock + # in ``multiprocessing.Process`` when flushing stdout/stderr during process + # shutdown. + # + # We should no longer have a problem with ``Display``, as it now proxies over + # the queue from a fork. However, to avoid any issues with plugins that may + # be doing their own printing, this has been kept. + # + # This happens at the very end to avoid that deadlock, by simply side + # stepping it. This should not be treated as a long term fix. + # + # TODO: Evaluate migrating away from the ``fork`` multiprocessing start method. + sys.stdout = sys.stderr = open(os.devnull, 'w') + + def _run(self): + ''' + Called when the process is started. Pushes the result onto the + results queue. We also remove the host from the blocked hosts list, to + signify that they are ready for their next task. + ''' + + # import cProfile, pstats, StringIO + # pr = cProfile.Profile() + # pr.enable() + + # Set the queue on Display so calls to Display.display are proxied over the queue + display.set_queue(self._final_q) + + try: + # execute the task and build a TaskResult from the result + display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task)) + executor_result = TaskExecutor( + self._host, + self._task, + self._task_vars, + self._play_context, + self._new_stdin, + self._loader, + self._shared_loader_obj, + self._final_q + ).run() + + display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid)) + self._host.vars = dict() + self._host.groups = [] + + # put the result on the result queue + display.debug("sending task result for task %s" % self._task._uuid) + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + executor_result, + task_fields=self._task.dump_attrs(), + ) + display.debug("done sending task result for task %s" % self._task._uuid) + + except AnsibleConnectionFailure: + self._host.vars = dict() + self._host.groups = [] + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + dict(unreachable=True), + task_fields=self._task.dump_attrs(), + ) + + except Exception as e: + if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): + try: + self._host.vars = dict() + self._host.groups = [] + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''), + task_fields=self._task.dump_attrs(), + ) + except Exception: + display.debug(u"WORKER EXCEPTION: %s" % to_text(e)) + display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) + finally: + self._clean_up() + + display.debug("WORKER PROCESS EXITING") + + # pr.disable() + # s = StringIO.StringIO() + # sortby = 'time' + # ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + # ps.print_stats() + # with open('worker_%06d.stats' % os.getpid(), 'w') as f: + # f.write(s.getvalue()) + + def _clean_up(self): + # NOTE: see note in init about forks + # ensure we cleanup all temp files for this worker + self._loader.cleanup_all_tmp_files() diff --git a/lib/ansible/executor/stats.py b/lib/ansible/executor/stats.py new file mode 100644 index 0000000..13a053b --- /dev/null +++ b/lib/ansible/executor/stats.py @@ -0,0 +1,100 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from collections.abc import MutableMapping + +from ansible.utils.vars import merge_hash + + +class AggregateStats: + ''' holds stats about per-host activity during playbook runs ''' + + def __init__(self): + + self.processed = {} + self.failures = {} + self.ok = {} + self.dark = {} + self.changed = {} + self.skipped = {} + self.rescued = {} + self.ignored = {} + + # user defined stats, which can be per host or global + self.custom = {} + + def increment(self, what, host): + ''' helper function to bump a statistic ''' + + self.processed[host] = 1 + prev = (getattr(self, what)).get(host, 0) + getattr(self, what)[host] = prev + 1 + + def decrement(self, what, host): + _what = getattr(self, what) + try: + if _what[host] - 1 < 0: + # This should never happen, but let's be safe + raise KeyError("Don't be so negative") + _what[host] -= 1 + except KeyError: + _what[host] = 0 + + def summarize(self, host): + ''' return information about a particular host ''' + + return dict( + ok=self.ok.get(host, 0), + failures=self.failures.get(host, 0), + unreachable=self.dark.get(host, 0), + changed=self.changed.get(host, 0), + skipped=self.skipped.get(host, 0), + rescued=self.rescued.get(host, 0), + ignored=self.ignored.get(host, 0), + ) + + def set_custom_stats(self, which, what, host=None): + ''' allow setting of a custom stat''' + + if host is None: + host = '_run' + if host not in self.custom: + self.custom[host] = {which: what} + else: + self.custom[host][which] = what + + def update_custom_stats(self, which, what, host=None): + ''' allow aggregation of a custom stat''' + + if host is None: + host = '_run' + if host not in self.custom or which not in self.custom[host]: + return self.set_custom_stats(which, what, host) + + # mismatching types + if not isinstance(what, type(self.custom[host][which])): + return None + + if isinstance(what, MutableMapping): + self.custom[host][which] = merge_hash(self.custom[host][which], what) + else: + # let overloaded + take care of other types + self.custom[host][which] += what diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py new file mode 100644 index 0000000..02ace8f --- /dev/null +++ b/lib/ansible/executor/task_executor.py @@ -0,0 +1,1239 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import pty +import time +import json +import signal +import subprocess +import sys +import termios +import traceback + +from ansible import constants as C +from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable, AnsibleConnectionFailure, AnsibleActionFail, AnsibleActionSkip +from ansible.executor.task_result import TaskResult +from ansible.executor.module_common import get_action_args_with_defaults +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.module_utils.six import binary_type +from ansible.module_utils._text import to_text, to_native +from ansible.module_utils.connection import write_to_file_descriptor +from ansible.playbook.conditional import Conditional +from ansible.playbook.task import Task +from ansible.plugins import get_plugin_class +from ansible.plugins.loader import become_loader, cliconf_loader, connection_loader, httpapi_loader, netconf_loader, terminal_loader +from ansible.template import Templar +from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef +from ansible.utils.listify import listify_lookup_plugin_terms +from ansible.utils.unsafe_proxy import to_unsafe_text, wrap_var +from ansible.vars.clean import namespace_facts, clean_facts +from ansible.utils.display import Display +from ansible.utils.vars import combine_vars, isidentifier + +display = Display() + + +RETURN_VARS = [x for x in C.MAGIC_VARIABLE_MAPPING.items() if 'become' not in x and '_pass' not in x] + +__all__ = ['TaskExecutor'] + + +class TaskTimeoutError(BaseException): + pass + + +def task_timeout(signum, frame): + raise TaskTimeoutError + + +def remove_omit(task_args, omit_token): + ''' + Remove args with a value equal to the ``omit_token`` recursively + to align with now having suboptions in the argument_spec + ''' + + if not isinstance(task_args, dict): + return task_args + + new_args = {} + for i in task_args.items(): + if i[1] == omit_token: + continue + elif isinstance(i[1], dict): + new_args[i[0]] = remove_omit(i[1], omit_token) + elif isinstance(i[1], list): + new_args[i[0]] = [remove_omit(v, omit_token) for v in i[1]] + else: + new_args[i[0]] = i[1] + + return new_args + + +class TaskExecutor: + + ''' + This is the main worker class for the executor pipeline, which + handles loading an action plugin to actually dispatch the task to + a given host. This class roughly corresponds to the old Runner() + class. + ''' + + def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q): + self._host = host + self._task = task + self._job_vars = job_vars + self._play_context = play_context + self._new_stdin = new_stdin + self._loader = loader + self._shared_loader_obj = shared_loader_obj + self._connection = None + self._final_q = final_q + self._loop_eval_error = None + + self._task.squash() + + def run(self): + ''' + The main executor entrypoint, where we determine if the specified + task requires looping and either runs the task with self._run_loop() + or self._execute(). After that, the returned results are parsed and + returned as a dict. + ''' + + display.debug("in run() - task %s" % self._task._uuid) + + try: + try: + items = self._get_loop_items() + except AnsibleUndefinedVariable as e: + # save the error raised here for use later + items = None + self._loop_eval_error = e + + if items is not None: + if len(items) > 0: + item_results = self._run_loop(items) + + # create the overall result item + res = dict(results=item_results) + + # loop through the item results and set the global changed/failed/skipped result flags based on any item. + res['skipped'] = True + for item in item_results: + if 'changed' in item and item['changed'] and not res.get('changed'): + res['changed'] = True + if res['skipped'] and ('skipped' not in item or ('skipped' in item and not item['skipped'])): + res['skipped'] = False + if 'failed' in item and item['failed']: + item_ignore = item.pop('_ansible_ignore_errors') + if not res.get('failed'): + res['failed'] = True + res['msg'] = 'One or more items failed' + self._task.ignore_errors = item_ignore + elif self._task.ignore_errors and not item_ignore: + self._task.ignore_errors = item_ignore + + # ensure to accumulate these + for array in ['warnings', 'deprecations']: + if array in item and item[array]: + if array not in res: + res[array] = [] + if not isinstance(item[array], list): + item[array] = [item[array]] + res[array] = res[array] + item[array] + del item[array] + + if not res.get('failed', False): + res['msg'] = 'All items completed' + if res['skipped']: + res['msg'] = 'All items skipped' + else: + res = dict(changed=False, skipped=True, skipped_reason='No items in the list', results=[]) + else: + display.debug("calling self._execute()") + res = self._execute() + display.debug("_execute() done") + + # make sure changed is set in the result, if it's not present + if 'changed' not in res: + res['changed'] = False + + def _clean_res(res, errors='surrogate_or_strict'): + if isinstance(res, binary_type): + return to_unsafe_text(res, errors=errors) + elif isinstance(res, dict): + for k in res: + try: + res[k] = _clean_res(res[k], errors=errors) + except UnicodeError: + if k == 'diff': + # If this is a diff, substitute a replacement character if the value + # is undecodable as utf8. (Fix #21804) + display.warning("We were unable to decode all characters in the module return data." + " Replaced some in an effort to return as much as possible") + res[k] = _clean_res(res[k], errors='surrogate_then_replace') + else: + raise + elif isinstance(res, list): + for idx, item in enumerate(res): + res[idx] = _clean_res(item, errors=errors) + return res + + display.debug("dumping result to json") + res = _clean_res(res) + display.debug("done dumping result, returning") + return res + except AnsibleError as e: + return dict(failed=True, msg=wrap_var(to_text(e, nonstring='simplerepr')), _ansible_no_log=self._play_context.no_log) + except Exception as e: + return dict(failed=True, msg=wrap_var('Unexpected failure during module execution: %s' % (to_native(e, nonstring='simplerepr'))), + exception=to_text(traceback.format_exc()), stdout='', _ansible_no_log=self._play_context.no_log) + finally: + try: + self._connection.close() + except AttributeError: + pass + except Exception as e: + display.debug(u"error closing connection: %s" % to_text(e)) + + def _get_loop_items(self): + ''' + Loads a lookup plugin to handle the with_* portion of a task (if specified), + and returns the items result. + ''' + + # get search path for this task to pass to lookup plugins + self._job_vars['ansible_search_path'] = self._task.get_search_path() + + # ensure basedir is always in (dwim already searches here but we need to display it) + if self._loader.get_basedir() not in self._job_vars['ansible_search_path']: + self._job_vars['ansible_search_path'].append(self._loader.get_basedir()) + + templar = Templar(loader=self._loader, variables=self._job_vars) + items = None + loop_cache = self._job_vars.get('_ansible_loop_cache') + if loop_cache is not None: + # _ansible_loop_cache may be set in `get_vars` when calculating `delegate_to` + # to avoid reprocessing the loop + items = loop_cache + elif self._task.loop_with: + if self._task.loop_with in self._shared_loader_obj.lookup_loader: + fail = True + if self._task.loop_with == 'first_found': + # first_found loops are special. If the item is undefined then we want to fall through to the next value rather than failing. + fail = False + + loop_terms = listify_lookup_plugin_terms(terms=self._task.loop, templar=templar, fail_on_undefined=fail, convert_bare=False) + if not fail: + loop_terms = [t for t in loop_terms if not templar.is_template(t)] + + # get lookup + mylookup = self._shared_loader_obj.lookup_loader.get(self._task.loop_with, loader=self._loader, templar=templar) + + # give lookup task 'context' for subdir (mostly needed for first_found) + for subdir in ['template', 'var', 'file']: # TODO: move this to constants? + if subdir in self._task.action: + break + setattr(mylookup, '_subdir', subdir + 's') + + # run lookup + items = wrap_var(mylookup.run(terms=loop_terms, variables=self._job_vars, wantlist=True)) + else: + raise AnsibleError("Unexpected failure in finding the lookup named '%s' in the available lookup plugins" % self._task.loop_with) + + elif self._task.loop is not None: + items = templar.template(self._task.loop) + if not isinstance(items, list): + raise AnsibleError( + "Invalid data passed to 'loop', it requires a list, got this instead: %s." + " Hint: If you passed a list/dict of just one element," + " try adding wantlist=True to your lookup invocation or use q/query instead of lookup." % items + ) + + return items + + def _run_loop(self, items): + ''' + Runs the task with the loop items specified and collates the result + into an array named 'results' which is inserted into the final result + along with the item for which the loop ran. + ''' + task_vars = self._job_vars + templar = Templar(loader=self._loader, variables=task_vars) + + self._task.loop_control.post_validate(templar=templar) + + loop_var = self._task.loop_control.loop_var + index_var = self._task.loop_control.index_var + loop_pause = self._task.loop_control.pause + extended = self._task.loop_control.extended + extended_allitems = self._task.loop_control.extended_allitems + # ensure we always have a label + label = self._task.loop_control.label or '{{' + loop_var + '}}' + + if loop_var in task_vars: + display.warning(u"%s: The loop variable '%s' is already in use. " + u"You should set the `loop_var` value in the `loop_control` option for the task" + u" to something else to avoid variable collisions and unexpected behavior." % (self._task, loop_var)) + + ran_once = False + no_log = False + items_len = len(items) + results = [] + for item_index, item in enumerate(items): + task_vars['ansible_loop_var'] = loop_var + + task_vars[loop_var] = item + if index_var: + task_vars['ansible_index_var'] = index_var + task_vars[index_var] = item_index + + if extended: + task_vars['ansible_loop'] = { + 'index': item_index + 1, + 'index0': item_index, + 'first': item_index == 0, + 'last': item_index + 1 == items_len, + 'length': items_len, + 'revindex': items_len - item_index, + 'revindex0': items_len - item_index - 1, + } + if extended_allitems: + task_vars['ansible_loop']['allitems'] = items + try: + task_vars['ansible_loop']['nextitem'] = items[item_index + 1] + except IndexError: + pass + if item_index - 1 >= 0: + task_vars['ansible_loop']['previtem'] = items[item_index - 1] + + # Update template vars to reflect current loop iteration + templar.available_variables = task_vars + + # pause between loop iterations + if loop_pause and ran_once: + time.sleep(loop_pause) + else: + ran_once = True + + try: + tmp_task = self._task.copy(exclude_parent=True, exclude_tasks=True) + tmp_task._parent = self._task._parent + tmp_play_context = self._play_context.copy() + except AnsibleParserError as e: + results.append(dict(failed=True, msg=to_text(e))) + continue + + # now we swap the internal task and play context with their copies, + # execute, and swap them back so we can do the next iteration cleanly + (self._task, tmp_task) = (tmp_task, self._task) + (self._play_context, tmp_play_context) = (tmp_play_context, self._play_context) + res = self._execute(variables=task_vars) + task_fields = self._task.dump_attrs() + (self._task, tmp_task) = (tmp_task, self._task) + (self._play_context, tmp_play_context) = (tmp_play_context, self._play_context) + + # update 'general no_log' based on specific no_log + no_log = no_log or tmp_task.no_log + + # now update the result with the item info, and append the result + # to the list of results + res[loop_var] = item + res['ansible_loop_var'] = loop_var + if index_var: + res[index_var] = item_index + res['ansible_index_var'] = index_var + if extended: + res['ansible_loop'] = task_vars['ansible_loop'] + + res['_ansible_item_result'] = True + res['_ansible_ignore_errors'] = task_fields.get('ignore_errors') + + # gets templated here unlike rest of loop_control fields, depends on loop_var above + try: + res['_ansible_item_label'] = templar.template(label) + except AnsibleUndefinedVariable as e: + res.update({ + 'failed': True, + 'msg': 'Failed to template loop_control.label: %s' % to_text(e) + }) + + tr = TaskResult( + self._host.name, + self._task._uuid, + res, + task_fields=task_fields, + ) + if tr.is_failed() or tr.is_unreachable(): + self._final_q.send_callback('v2_runner_item_on_failed', tr) + elif tr.is_skipped(): + self._final_q.send_callback('v2_runner_item_on_skipped', tr) + else: + if getattr(self._task, 'diff', False): + self._final_q.send_callback('v2_on_file_diff', tr) + if self._task.action not in C._ACTION_INVENTORY_TASKS: + self._final_q.send_callback('v2_runner_item_on_ok', tr) + + results.append(res) + del task_vars[loop_var] + + # clear 'connection related' plugin variables for next iteration + if self._connection: + clear_plugins = { + 'connection': self._connection._load_name, + 'shell': self._connection._shell._load_name + } + if self._connection.become: + clear_plugins['become'] = self._connection.become._load_name + + for plugin_type, plugin_name in clear_plugins.items(): + for var in C.config.get_plugin_vars(plugin_type, plugin_name): + if var in task_vars and var not in self._job_vars: + del task_vars[var] + + self._task.no_log = no_log + + return results + + def _execute(self, variables=None): + ''' + The primary workhorse of the executor system, this runs the task + on the specified host (which may be the delegated_to host) and handles + the retry/until and block rescue/always execution + ''' + + if variables is None: + variables = self._job_vars + + templar = Templar(loader=self._loader, variables=variables) + + context_validation_error = None + + # a certain subset of variables exist. + tempvars = variables.copy() + + try: + # TODO: remove play_context as this does not take delegation nor loops correctly into account, + # the task itself should hold the correct values for connection/shell/become/terminal plugin options to finalize. + # Kept for now for backwards compatibility and a few functions that are still exclusive to it. + + # apply the given task's information to the connection info, + # which may override some fields already set by the play or + # the options specified on the command line + self._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=variables, templar=templar) + + # fields set from the play/task may be based on variables, so we have to + # do the same kind of post validation step on it here before we use it. + self._play_context.post_validate(templar=templar) + + # now that the play context is finalized, if the remote_addr is not set + # default to using the host's address field as the remote address + if not self._play_context.remote_addr: + self._play_context.remote_addr = self._host.address + + # We also add "magic" variables back into the variables dict to make sure + self._play_context.update_vars(tempvars) + + except AnsibleError as e: + # save the error, which we'll raise later if we don't end up + # skipping this task during the conditional evaluation step + context_validation_error = e + + no_log = self._play_context.no_log + + # Evaluate the conditional (if any) for this task, which we do before running + # the final task post-validation. We do this before the post validation due to + # the fact that the conditional may specify that the task be skipped due to a + # variable not being present which would otherwise cause validation to fail + try: + if not self._task.evaluate_conditional(templar, tempvars): + display.debug("when evaluation is False, skipping this task") + return dict(changed=False, skipped=True, skip_reason='Conditional result was False', _ansible_no_log=no_log) + except AnsibleError as e: + # loop error takes precedence + if self._loop_eval_error is not None: + # Display the error from the conditional as well to prevent + # losing information useful for debugging. + display.v(to_text(e)) + raise self._loop_eval_error # pylint: disable=raising-bad-type + raise + + # Not skipping, if we had loop error raised earlier we need to raise it now to halt the execution of this task + if self._loop_eval_error is not None: + raise self._loop_eval_error # pylint: disable=raising-bad-type + + # if we ran into an error while setting up the PlayContext, raise it now, unless is known issue with delegation + # and undefined vars (correct values are in cvars later on and connection plugins, if still error, blows up there) + if context_validation_error is not None: + raiseit = True + if self._task.delegate_to: + if isinstance(context_validation_error, AnsibleUndefinedVariable): + raiseit = False + elif isinstance(context_validation_error, AnsibleParserError): + # parser error, might be cause by undef too + orig_exc = getattr(context_validation_error, 'orig_exc', None) + if isinstance(orig_exc, AnsibleUndefinedVariable): + raiseit = False + if raiseit: + raise context_validation_error # pylint: disable=raising-bad-type + + # set templar to use temp variables until loop is evaluated + templar.available_variables = tempvars + + # if this task is a TaskInclude, we just return now with a success code so the + # main thread can expand the task list for the given host + if self._task.action in C._ACTION_ALL_INCLUDE_TASKS: + include_args = self._task.args.copy() + include_file = include_args.pop('_raw_params', None) + if not include_file: + return dict(failed=True, msg="No include file was specified to the include") + + include_file = templar.template(include_file) + return dict(include=include_file, include_args=include_args) + + # if this task is a IncludeRole, we just return now with a success code so the main thread can expand the task list for the given host + elif self._task.action in C._ACTION_INCLUDE_ROLE: + include_args = self._task.args.copy() + return dict(include_args=include_args) + + # Now we do final validation on the task, which sets all fields to their final values. + try: + self._task.post_validate(templar=templar) + except AnsibleError: + raise + except Exception: + return dict(changed=False, failed=True, _ansible_no_log=no_log, exception=to_text(traceback.format_exc())) + if '_variable_params' in self._task.args: + variable_params = self._task.args.pop('_variable_params') + if isinstance(variable_params, dict): + if C.INJECT_FACTS_AS_VARS: + display.warning("Using a variable for a task's 'args' is unsafe in some situations " + "(see https://docs.ansible.com/ansible/devel/reference_appendices/faq.html#argsplat-unsafe)") + variable_params.update(self._task.args) + self._task.args = variable_params + else: + # if we didn't get a dict, it means there's garbage remaining after k=v parsing, just give up + # see https://github.com/ansible/ansible/issues/79862 + raise AnsibleError(f"invalid or malformed argument: '{variable_params}'") + + # update no_log to task value, now that we have it templated + no_log = self._task.no_log + + # free tempvars up, not used anymore, cvars and vars_copy should be mainly used after this point + # updating the original 'variables' at the end + tempvars = {} + + # setup cvars copy, used for all connection related templating + if self._task.delegate_to: + # use vars from delegated host (which already include task vars) instead of original host + cvars = variables.get('ansible_delegated_vars', {}).get(self._task.delegate_to, {}) + else: + # just use normal host vars + cvars = variables + + templar.available_variables = cvars + + # use magic var if it exists, if not, let task inheritance do it's thing. + if cvars.get('ansible_connection') is not None: + current_connection = templar.template(cvars['ansible_connection']) + else: + current_connection = self._task.connection + + # get the connection and the handler for this execution + if (not self._connection or + not getattr(self._connection, 'connected', False) or + not self._connection.matches_name([current_connection]) or + # pc compare, left here for old plugins, but should be irrelevant for those + # using get_option, since they are cleared each iteration. + self._play_context.remote_addr != self._connection._play_context.remote_addr): + self._connection = self._get_connection(cvars, templar, current_connection) + else: + # if connection is reused, its _play_context is no longer valid and needs + # to be replaced with the one templated above, in case other data changed + self._connection._play_context = self._play_context + self._set_become_plugin(cvars, templar, self._connection) + + plugin_vars = self._set_connection_options(cvars, templar) + + # make a copy of the job vars here, as we update them here and later, + # but don't want to pollute original + vars_copy = variables.copy() + # update with connection info (i.e ansible_host/ansible_user) + self._connection.update_vars(vars_copy) + templar.available_variables = vars_copy + + # TODO: eventually remove as pc is taken out of the resolution path + # feed back into pc to ensure plugins not using get_option can get correct value + self._connection._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=vars_copy, templar=templar) + + # for persistent connections, initialize socket path and start connection manager + if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)): + self._play_context.timeout = self._connection.get_option('persistent_command_timeout') + display.vvvv('attempting to start connection', host=self._play_context.remote_addr) + display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr) + + options = self._connection.get_options() + socket_path = start_connection(self._play_context, options, self._task._uuid) + display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr) + setattr(self._connection, '_socket_path', socket_path) + + # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules + # special handling for python interpreter for network_os, default to ansible python unless overridden + if 'ansible_network_os' in cvars and 'ansible_python_interpreter' not in cvars: + # this also avoids 'python discovery' + cvars['ansible_python_interpreter'] = sys.executable + + # get handler + self._handler, module_context = self._get_action_handler_with_module_context(connection=self._connection, templar=templar) + + if module_context is not None: + module_defaults_fqcn = module_context.resolved_fqcn + else: + module_defaults_fqcn = self._task.resolved_action + + # Apply default params for action/module, if present + self._task.args = get_action_args_with_defaults( + module_defaults_fqcn, self._task.args, self._task.module_defaults, templar, + action_groups=self._task._parent._play._action_groups + ) + + # And filter out any fields which were set to default(omit), and got the omit token value + omit_token = variables.get('omit') + if omit_token is not None: + self._task.args = remove_omit(self._task.args, omit_token) + + # Read some values from the task, so that we can modify them if need be + if self._task.until: + retries = self._task.retries + if retries is None: + retries = 3 + elif retries <= 0: + retries = 1 + else: + retries += 1 + else: + retries = 1 + + delay = self._task.delay + if delay < 0: + delay = 1 + + display.debug("starting attempt loop") + result = None + for attempt in range(1, retries + 1): + display.debug("running the handler") + try: + if self._task.timeout: + old_sig = signal.signal(signal.SIGALRM, task_timeout) + signal.alarm(self._task.timeout) + result = self._handler.run(task_vars=vars_copy) + except (AnsibleActionFail, AnsibleActionSkip) as e: + return e.result + except AnsibleConnectionFailure as e: + return dict(unreachable=True, msg=to_text(e)) + except TaskTimeoutError as e: + msg = 'The %s action failed to execute in the expected time frame (%d) and was terminated' % (self._task.action, self._task.timeout) + return dict(failed=True, msg=msg) + finally: + if self._task.timeout: + signal.alarm(0) + old_sig = signal.signal(signal.SIGALRM, old_sig) + self._handler.cleanup() + display.debug("handler run complete") + + # preserve no log + result["_ansible_no_log"] = no_log + + if self._task.action not in C._ACTION_WITH_CLEAN_FACTS: + result = wrap_var(result) + + # update the local copy of vars with the registered value, if specified, + # or any facts which may have been generated by the module execution + if self._task.register: + if not isidentifier(self._task.register): + raise AnsibleError("Invalid variable name in 'register' specified: '%s'" % self._task.register) + + vars_copy[self._task.register] = result + + if self._task.async_val > 0: + if self._task.poll > 0 and not result.get('skipped') and not result.get('failed'): + result = self._poll_async_result(result=result, templar=templar, task_vars=vars_copy) + if result.get('failed'): + self._final_q.send_callback( + 'v2_runner_on_async_failed', + TaskResult(self._host.name, + self._task._uuid, + result, + task_fields=self._task.dump_attrs())) + else: + self._final_q.send_callback( + 'v2_runner_on_async_ok', + TaskResult(self._host.name, + self._task._uuid, + result, + task_fields=self._task.dump_attrs())) + + # ensure no log is preserved + result["_ansible_no_log"] = no_log + + # helper methods for use below in evaluating changed/failed_when + def _evaluate_changed_when_result(result): + if self._task.changed_when is not None and self._task.changed_when: + cond = Conditional(loader=self._loader) + cond.when = self._task.changed_when + result['changed'] = cond.evaluate_conditional(templar, vars_copy) + + def _evaluate_failed_when_result(result): + if self._task.failed_when: + cond = Conditional(loader=self._loader) + cond.when = self._task.failed_when + failed_when_result = cond.evaluate_conditional(templar, vars_copy) + result['failed_when_result'] = result['failed'] = failed_when_result + else: + failed_when_result = False + return failed_when_result + + if 'ansible_facts' in result and self._task.action not in C._ACTION_DEBUG: + if self._task.action in C._ACTION_WITH_CLEAN_FACTS: + if self._task.delegate_to and self._task.delegate_facts: + if '_ansible_delegated_vars' in vars_copy: + vars_copy['_ansible_delegated_vars'].update(result['ansible_facts']) + else: + vars_copy['_ansible_delegated_vars'] = result['ansible_facts'] + else: + vars_copy.update(result['ansible_facts']) + else: + # TODO: cleaning of facts should eventually become part of taskresults instead of vars + af = wrap_var(result['ansible_facts']) + vars_copy['ansible_facts'] = combine_vars(vars_copy.get('ansible_facts', {}), namespace_facts(af)) + if C.INJECT_FACTS_AS_VARS: + vars_copy.update(clean_facts(af)) + + # set the failed property if it was missing. + if 'failed' not in result: + # rc is here for backwards compatibility and modules that use it instead of 'failed' + if 'rc' in result and result['rc'] not in [0, "0"]: + result['failed'] = True + else: + result['failed'] = False + + # Make attempts and retries available early to allow their use in changed/failed_when + if self._task.until: + result['attempts'] = attempt + + # set the changed property if it was missing. + if 'changed' not in result: + result['changed'] = False + + if self._task.action not in C._ACTION_WITH_CLEAN_FACTS: + result = wrap_var(result) + + # re-update the local copy of vars with the registered value, if specified, + # or any facts which may have been generated by the module execution + # This gives changed/failed_when access to additional recently modified + # attributes of result + if self._task.register: + vars_copy[self._task.register] = result + + # if we didn't skip this task, use the helpers to evaluate the changed/ + # failed_when properties + if 'skipped' not in result: + condname = 'changed' + + try: + _evaluate_changed_when_result(result) + condname = 'failed' + _evaluate_failed_when_result(result) + except AnsibleError as e: + result['failed'] = True + result['%s_when_result' % condname] = to_text(e) + + if retries > 1: + cond = Conditional(loader=self._loader) + cond.when = self._task.until + if cond.evaluate_conditional(templar, vars_copy): + break + else: + # no conditional check, or it failed, so sleep for the specified time + if attempt < retries: + result['_ansible_retry'] = True + result['retries'] = retries + display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) + self._final_q.send_callback( + 'v2_runner_retry', + TaskResult( + self._host.name, + self._task._uuid, + result, + task_fields=self._task.dump_attrs() + ) + ) + time.sleep(delay) + self._handler = self._get_action_handler(connection=self._connection, templar=templar) + else: + if retries > 1: + # we ran out of attempts, so mark the result as failed + result['attempts'] = retries - 1 + result['failed'] = True + + if self._task.action not in C._ACTION_WITH_CLEAN_FACTS: + result = wrap_var(result) + + # do the final update of the local variables here, for both registered + # values and any facts which may have been created + if self._task.register: + variables[self._task.register] = result + + if 'ansible_facts' in result and self._task.action not in C._ACTION_DEBUG: + if self._task.action in C._ACTION_WITH_CLEAN_FACTS: + variables.update(result['ansible_facts']) + else: + # TODO: cleaning of facts should eventually become part of taskresults instead of vars + af = wrap_var(result['ansible_facts']) + variables['ansible_facts'] = combine_vars(variables.get('ansible_facts', {}), namespace_facts(af)) + if C.INJECT_FACTS_AS_VARS: + variables.update(clean_facts(af)) + + # save the notification target in the result, if it was specified, as + # this task may be running in a loop in which case the notification + # may be item-specific, ie. "notify: service {{item}}" + if self._task.notify is not None: + result['_ansible_notify'] = self._task.notify + + # add the delegated vars to the result, so we can reference them + # on the results side without having to do any further templating + # also now add connection vars results when delegating + if self._task.delegate_to: + result["_ansible_delegated_vars"] = {'ansible_delegated_host': self._task.delegate_to} + for k in plugin_vars: + result["_ansible_delegated_vars"][k] = cvars.get(k) + + # note: here for callbacks that rely on this info to display delegation + for requireshed in ('ansible_host', 'ansible_port', 'ansible_user', 'ansible_connection'): + if requireshed not in result["_ansible_delegated_vars"] and requireshed in cvars: + result["_ansible_delegated_vars"][requireshed] = cvars.get(requireshed) + + # and return + display.debug("attempt loop complete, returning result") + return result + + def _poll_async_result(self, result, templar, task_vars=None): + ''' + Polls for the specified JID to be complete + ''' + + if task_vars is None: + task_vars = self._job_vars + + async_jid = result.get('ansible_job_id') + if async_jid is None: + return dict(failed=True, msg="No job id was returned by the async task") + + # Create a new pseudo-task to run the async_status module, and run + # that (with a sleep for "poll" seconds between each retry) until the + # async time limit is exceeded. + + async_task = Task.load(dict(action='async_status', args={'jid': async_jid}, environment=self._task.environment)) + + # FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized + # Because this is an async task, the action handler is async. However, + # we need the 'normal' action handler for the status check, so get it + # now via the action_loader + async_handler = self._shared_loader_obj.action_loader.get( + 'ansible.legacy.async_status', + task=async_task, + connection=self._connection, + play_context=self._play_context, + loader=self._loader, + templar=templar, + shared_loader_obj=self._shared_loader_obj, + ) + + time_left = self._task.async_val + while time_left > 0: + time.sleep(self._task.poll) + + try: + async_result = async_handler.run(task_vars=task_vars) + # We do not bail out of the loop in cases where the failure + # is associated with a parsing error. The async_runner can + # have issues which result in a half-written/unparseable result + # file on disk, which manifests to the user as a timeout happening + # before it's time to timeout. + if (int(async_result.get('finished', 0)) == 1 or + ('failed' in async_result and async_result.get('_ansible_parsed', False)) or + 'skipped' in async_result): + break + except Exception as e: + # Connections can raise exceptions during polling (eg, network bounce, reboot); these should be non-fatal. + # On an exception, call the connection's reset method if it has one + # (eg, drop/recreate WinRM connection; some reused connections are in a broken state) + display.vvvv("Exception during async poll, retrying... (%s)" % to_text(e)) + display.debug("Async poll exception was:\n%s" % to_text(traceback.format_exc())) + try: + async_handler._connection.reset() + except AttributeError: + pass + + # Little hack to raise the exception if we've exhausted the timeout period + time_left -= self._task.poll + if time_left <= 0: + raise + else: + time_left -= self._task.poll + self._final_q.send_callback( + 'v2_runner_on_async_poll', + TaskResult( + self._host.name, + async_task._uuid, + async_result, + task_fields=async_task.dump_attrs(), + ), + ) + + if int(async_result.get('finished', 0)) != 1: + if async_result.get('_ansible_parsed'): + return dict(failed=True, msg="async task did not complete within the requested time - %ss" % self._task.async_val, async_result=async_result) + else: + return dict(failed=True, msg="async task produced unparseable results", async_result=async_result) + else: + # If the async task finished, automatically cleanup the temporary + # status file left behind. + cleanup_task = Task.load( + { + 'async_status': { + 'jid': async_jid, + 'mode': 'cleanup', + }, + 'environment': self._task.environment, + } + ) + cleanup_handler = self._shared_loader_obj.action_loader.get( + 'ansible.legacy.async_status', + task=cleanup_task, + connection=self._connection, + play_context=self._play_context, + loader=self._loader, + templar=templar, + shared_loader_obj=self._shared_loader_obj, + ) + cleanup_handler.run(task_vars=task_vars) + cleanup_handler.cleanup(force=True) + async_handler.cleanup(force=True) + return async_result + + def _get_become(self, name): + become = become_loader.get(name) + if not become: + raise AnsibleError("Invalid become method specified, could not find matching plugin: '%s'. " + "Use `ansible-doc -t become -l` to list available plugins." % name) + return become + + def _get_connection(self, cvars, templar, current_connection): + ''' + Reads the connection property for the host, and returns the + correct connection object from the list of connection plugins + ''' + + self._play_context.connection = current_connection + + # TODO: play context has logic to update the connection for 'smart' + # (default value, will chose between ssh and paramiko) and 'persistent' + # (really paramiko), eventually this should move to task object itself. + conn_type = self._play_context.connection + + connection, plugin_load_context = self._shared_loader_obj.connection_loader.get_with_context( + conn_type, + self._play_context, + self._new_stdin, + task_uuid=self._task._uuid, + ansible_playbook_pid=to_text(os.getppid()) + ) + + if not connection: + raise AnsibleError("the connection plugin '%s' was not found" % conn_type) + + self._set_become_plugin(cvars, templar, connection) + + # Also backwards compat call for those still using play_context + self._play_context.set_attributes_from_plugin(connection) + + return connection + + def _set_become_plugin(self, cvars, templar, connection): + # load become plugin if needed + if cvars.get('ansible_become') is not None: + become = boolean(templar.template(cvars['ansible_become'])) + else: + become = self._task.become + + if become: + if cvars.get('ansible_become_method'): + become_plugin = self._get_become(templar.template(cvars['ansible_become_method'])) + else: + become_plugin = self._get_become(self._task.become_method) + + else: + # If become is not enabled on the task it needs to be removed from the connection plugin + # https://github.com/ansible/ansible/issues/78425 + become_plugin = None + + try: + connection.set_become_plugin(become_plugin) + except AttributeError: + # Older connection plugin that does not support set_become_plugin + pass + + if become_plugin: + if getattr(connection.become, 'require_tty', False) and not getattr(connection, 'has_tty', False): + raise AnsibleError( + "The '%s' connection does not provide a TTY which is required for the selected " + "become plugin: %s." % (connection._load_name, become_plugin.name) + ) + + # Backwards compat for connection plugins that don't support become plugins + # Just do this unconditionally for now, we could move it inside of the + # AttributeError above later + self._play_context.set_become_plugin(become_plugin.name) + + def _set_plugin_options(self, plugin_type, variables, templar, task_keys): + try: + plugin = getattr(self._connection, '_%s' % plugin_type) + except AttributeError: + # Some plugins are assigned to private attrs, ``become`` is not + plugin = getattr(self._connection, plugin_type) + + # network_cli's "real" connection plugin is not named connection + # to avoid the confusion of having connection.connection + if plugin_type == "ssh_type_conn": + plugin_type = "connection" + option_vars = C.config.get_plugin_vars(plugin_type, plugin._load_name) + options = {} + for k in option_vars: + if k in variables: + options[k] = templar.template(variables[k]) + # TODO move to task method? + plugin.set_options(task_keys=task_keys, var_options=options) + + return option_vars + + def _set_connection_options(self, variables, templar): + + # keep list of variable names possibly consumed + varnames = [] + + # grab list of usable vars for this plugin + option_vars = C.config.get_plugin_vars('connection', self._connection._load_name) + varnames.extend(option_vars) + + # create dict of 'templated vars' + options = {'_extras': {}} + for k in option_vars: + if k in variables: + options[k] = templar.template(variables[k]) + + # add extras if plugin supports them + if getattr(self._connection, 'allow_extras', False): + for k in variables: + if k.startswith('ansible_%s_' % self._connection._load_name) and k not in options: + options['_extras'][k] = templar.template(variables[k]) + + task_keys = self._task.dump_attrs() + + # The task_keys 'timeout' attr is the task's timeout, not the connection timeout. + # The connection timeout is threaded through the play_context for now. + task_keys['timeout'] = self._play_context.timeout + + if self._play_context.password: + # The connection password is threaded through the play_context for + # now. This is something we ultimately want to avoid, but the first + # step is to get connection plugins pulling the password through the + # config system instead of directly accessing play_context. + task_keys['password'] = self._play_context.password + + # Prevent task retries from overriding connection retries + del task_keys['retries'] + + # set options with 'templated vars' specific to this plugin and dependent ones + self._connection.set_options(task_keys=task_keys, var_options=options) + varnames.extend(self._set_plugin_options('shell', variables, templar, task_keys)) + + if self._connection.become is not None: + if self._play_context.become_pass: + # FIXME: eventually remove from task and play_context, here for backwards compat + # keep out of play objects to avoid accidental disclosure, only become plugin should have + # The become pass is already in the play_context if given on + # the CLI (-K). Make the plugin aware of it in this case. + task_keys['become_pass'] = self._play_context.become_pass + + varnames.extend(self._set_plugin_options('become', variables, templar, task_keys)) + + # FOR BACKWARDS COMPAT: + for option in ('become_user', 'become_flags', 'become_exe', 'become_pass'): + try: + setattr(self._play_context, option, self._connection.become.get_option(option)) + except KeyError: + pass # some plugins don't support all base flags + self._play_context.prompt = self._connection.become.prompt + + # deals with networking sub_plugins (network_cli/httpapi/netconf) + sub = getattr(self._connection, '_sub_plugin', None) + if sub is not None and sub.get('type') != 'external': + plugin_type = get_plugin_class(sub.get("obj")) + varnames.extend(self._set_plugin_options(plugin_type, variables, templar, task_keys)) + sub_conn = getattr(self._connection, 'ssh_type_conn', None) + if sub_conn is not None: + varnames.extend(self._set_plugin_options("ssh_type_conn", variables, templar, task_keys)) + + return varnames + + def _get_action_handler(self, connection, templar): + ''' + Returns the correct action plugin to handle the requestion task action + ''' + return self._get_action_handler_with_module_context(connection, templar)[0] + + def _get_action_handler_with_module_context(self, connection, templar): + ''' + Returns the correct action plugin to handle the requestion task action and the module context + ''' + module_collection, separator, module_name = self._task.action.rpartition(".") + module_prefix = module_name.split('_')[0] + if module_collection: + # For network modules, which look for one action plugin per platform, look for the + # action plugin in the same collection as the module by prefixing the action plugin + # with the same collection. + network_action = "{0}.{1}".format(module_collection, module_prefix) + else: + network_action = module_prefix + + collections = self._task.collections + + # Check if the module has specified an action handler + module = self._shared_loader_obj.module_loader.find_plugin_with_context( + self._task.action, collection_list=collections + ) + if not module.resolved or not module.action_plugin: + module = None + if module is not None: + handler_name = module.action_plugin + # let action plugin override module, fallback to 'normal' action plugin otherwise + elif self._shared_loader_obj.action_loader.has_plugin(self._task.action, collection_list=collections): + handler_name = self._task.action + elif all((module_prefix in C.NETWORK_GROUP_MODULES, self._shared_loader_obj.action_loader.has_plugin(network_action, collection_list=collections))): + handler_name = network_action + display.vvvv("Using network group action {handler} for {action}".format(handler=handler_name, + action=self._task.action), + host=self._play_context.remote_addr) + else: + # use ansible.legacy.normal to allow (historic) local action_plugins/ override without collections search + handler_name = 'ansible.legacy.normal' + collections = None # until then, we don't want the task's collection list to be consulted; use the builtin + + handler = self._shared_loader_obj.action_loader.get( + handler_name, + task=self._task, + connection=connection, + play_context=self._play_context, + loader=self._loader, + templar=templar, + shared_loader_obj=self._shared_loader_obj, + collection_list=collections + ) + + if not handler: + raise AnsibleError("the handler '%s' was not found" % handler_name) + + return handler, module + + +def start_connection(play_context, options, task_uuid): + ''' + Starts the persistent connection + ''' + candidate_paths = [C.ANSIBLE_CONNECTION_PATH or os.path.dirname(sys.argv[0])] + candidate_paths.extend(os.environ.get('PATH', '').split(os.pathsep)) + for dirname in candidate_paths: + ansible_connection = os.path.join(dirname, 'ansible-connection') + if os.path.isfile(ansible_connection): + display.vvvv("Found ansible-connection at path {0}".format(ansible_connection)) + break + else: + raise AnsibleError("Unable to find location of 'ansible-connection'. " + "Please set or check the value of ANSIBLE_CONNECTION_PATH") + + env = os.environ.copy() + env.update({ + # HACK; most of these paths may change during the controller's lifetime + # (eg, due to late dynamic role includes, multi-playbook execution), without a way + # to invalidate/update, ansible-connection won't always see the same plugins the controller + # can. + 'ANSIBLE_BECOME_PLUGINS': become_loader.print_paths(), + 'ANSIBLE_CLICONF_PLUGINS': cliconf_loader.print_paths(), + 'ANSIBLE_COLLECTIONS_PATH': to_native(os.pathsep.join(AnsibleCollectionConfig.collection_paths)), + 'ANSIBLE_CONNECTION_PLUGINS': connection_loader.print_paths(), + 'ANSIBLE_HTTPAPI_PLUGINS': httpapi_loader.print_paths(), + 'ANSIBLE_NETCONF_PLUGINS': netconf_loader.print_paths(), + 'ANSIBLE_TERMINAL_PLUGINS': terminal_loader.print_paths(), + }) + verbosity = [] + if display.verbosity: + verbosity.append('-%s' % ('v' * display.verbosity)) + python = sys.executable + master, slave = pty.openpty() + p = subprocess.Popen( + [python, ansible_connection, *verbosity, to_text(os.getppid()), to_text(task_uuid)], + stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env + ) + os.close(slave) + + # We need to set the pty into noncanonical mode. This ensures that we + # can receive lines longer than 4095 characters (plus newline) without + # truncating. + old = termios.tcgetattr(master) + new = termios.tcgetattr(master) + new[3] = new[3] & ~termios.ICANON + + try: + termios.tcsetattr(master, termios.TCSANOW, new) + write_to_file_descriptor(master, options) + write_to_file_descriptor(master, play_context.serialize()) + + (stdout, stderr) = p.communicate() + finally: + termios.tcsetattr(master, termios.TCSANOW, old) + os.close(master) + + if p.returncode == 0: + result = json.loads(to_text(stdout, errors='surrogate_then_replace')) + else: + try: + result = json.loads(to_text(stderr, errors='surrogate_then_replace')) + except getattr(json.decoder, 'JSONDecodeError', ValueError): + # JSONDecodeError only available on Python 3.5+ + result = {'error': to_text(stderr, errors='surrogate_then_replace')} + + if 'messages' in result: + for level, message in result['messages']: + if level == 'log': + display.display(message, log_only=True) + elif level in ('debug', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv'): + getattr(display, level)(message, host=play_context.remote_addr) + else: + if hasattr(display, level): + getattr(display, level)(message) + else: + display.vvvv(message, host=play_context.remote_addr) + + if 'error' in result: + if display.verbosity > 2: + if result.get('exception'): + msg = "The full traceback is:\n" + result['exception'] + display.display(msg, color=C.COLOR_ERROR) + raise AnsibleError(result['error']) + + return result['socket_path'] diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py new file mode 100644 index 0000000..dcfc38a --- /dev/null +++ b/lib/ansible/executor/task_queue_manager.py @@ -0,0 +1,456 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import sys +import tempfile +import threading +import time +import multiprocessing.queues + +from ansible import constants as C +from ansible import context +from ansible.errors import AnsibleError +from ansible.executor.play_iterator import PlayIterator +from ansible.executor.stats import AggregateStats +from ansible.executor.task_result import TaskResult +from ansible.module_utils.six import string_types +from ansible.module_utils._text import to_text, to_native +from ansible.playbook.play_context import PlayContext +from ansible.playbook.task import Task +from ansible.plugins.loader import callback_loader, strategy_loader, module_loader +from ansible.plugins.callback import CallbackBase +from ansible.template import Templar +from ansible.vars.hostvars import HostVars +from ansible.vars.reserved import warn_if_reserved +from ansible.utils.display import Display +from ansible.utils.lock import lock_decorator +from ansible.utils.multiprocessing import context as multiprocessing_context + + +__all__ = ['TaskQueueManager'] + +display = Display() + + +class CallbackSend: + def __init__(self, method_name, *args, **kwargs): + self.method_name = method_name + self.args = args + self.kwargs = kwargs + + +class DisplaySend: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + +class FinalQueue(multiprocessing.queues.Queue): + def __init__(self, *args, **kwargs): + kwargs['ctx'] = multiprocessing_context + super(FinalQueue, self).__init__(*args, **kwargs) + + def send_callback(self, method_name, *args, **kwargs): + self.put( + CallbackSend(method_name, *args, **kwargs), + block=False + ) + + def send_task_result(self, *args, **kwargs): + if isinstance(args[0], TaskResult): + tr = args[0] + else: + tr = TaskResult(*args, **kwargs) + self.put( + tr, + block=False + ) + + def send_display(self, *args, **kwargs): + self.put( + DisplaySend(*args, **kwargs), + block=False + ) + + +class AnsibleEndPlay(Exception): + def __init__(self, result): + self.result = result + + +class TaskQueueManager: + + ''' + This class handles the multiprocessing requirements of Ansible by + creating a pool of worker forks, a result handler fork, and a + manager object with shared datastructures/queues for coordinating + work between all processes. + + The queue manager is responsible for loading the play strategy plugin, + which dispatches the Play's tasks to hosts. + ''' + + RUN_OK = 0 + RUN_ERROR = 1 + RUN_FAILED_HOSTS = 2 + RUN_UNREACHABLE_HOSTS = 4 + RUN_FAILED_BREAK_PLAY = 8 + RUN_UNKNOWN_ERROR = 255 + + def __init__(self, inventory, variable_manager, loader, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None): + + self._inventory = inventory + self._variable_manager = variable_manager + self._loader = loader + self._stats = AggregateStats() + self.passwords = passwords + self._stdout_callback = stdout_callback + self._run_additional_callbacks = run_additional_callbacks + self._run_tree = run_tree + self._forks = forks or 5 + + self._callbacks_loaded = False + self._callback_plugins = [] + self._start_at_done = False + + # make sure any module paths (if specified) are added to the module_loader + if context.CLIARGS.get('module_path', False): + for path in context.CLIARGS['module_path']: + if path: + module_loader.add_directory(path) + + # a special flag to help us exit cleanly + self._terminated = False + + # dictionaries to keep track of failed/unreachable hosts + self._failed_hosts = dict() + self._unreachable_hosts = dict() + + try: + self._final_q = FinalQueue() + except OSError as e: + raise AnsibleError("Unable to use multiprocessing, this is normally caused by lack of access to /dev/shm: %s" % to_native(e)) + + self._callback_lock = threading.Lock() + + # A temporary file (opened pre-fork) used by connection + # plugins for inter-process locking. + self._connection_lockfile = tempfile.TemporaryFile() + + def _initialize_processes(self, num): + self._workers = [] + + for i in range(num): + self._workers.append(None) + + def load_callbacks(self): + ''' + Loads all available callbacks, with the exception of those which + utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout', + only one such callback plugin will be loaded. + ''' + + if self._callbacks_loaded: + return + + stdout_callback_loaded = False + if self._stdout_callback is None: + self._stdout_callback = C.DEFAULT_STDOUT_CALLBACK + + if isinstance(self._stdout_callback, CallbackBase): + stdout_callback_loaded = True + elif isinstance(self._stdout_callback, string_types): + if self._stdout_callback not in callback_loader: + raise AnsibleError("Invalid callback for stdout specified: %s" % self._stdout_callback) + else: + self._stdout_callback = callback_loader.get(self._stdout_callback) + self._stdout_callback.set_options() + stdout_callback_loaded = True + else: + raise AnsibleError("callback must be an instance of CallbackBase or the name of a callback plugin") + + # get all configured loadable callbacks (adjacent, builtin) + callback_list = list(callback_loader.all(class_only=True)) + + # add enabled callbacks that refer to collections, which might not appear in normal listing + for c in C.CALLBACKS_ENABLED: + # load all, as collection ones might be using short/redirected names and not a fqcn + plugin = callback_loader.get(c, class_only=True) + + # TODO: check if this skip is redundant, loader should handle bad file/plugin cases already + if plugin: + # avoids incorrect and dupes possible due to collections + if plugin not in callback_list: + callback_list.append(plugin) + else: + display.warning("Skipping callback plugin '%s', unable to load" % c) + + # for each callback in the list see if we should add it to 'active callbacks' used in the play + for callback_plugin in callback_list: + + callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', '') + callback_needs_enabled = getattr(callback_plugin, 'CALLBACK_NEEDS_ENABLED', getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False)) + + # try to get colleciotn world name first + cnames = getattr(callback_plugin, '_redirected_names', []) + if cnames: + # store the name the plugin was loaded as, as that's what we'll need to compare to the configured callback list later + callback_name = cnames[0] + else: + # fallback to 'old loader name' + (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) + + display.vvvvv("Attempting to use '%s' callback." % (callback_name)) + if callback_type == 'stdout': + # we only allow one callback of type 'stdout' to be loaded, + if callback_name != self._stdout_callback or stdout_callback_loaded: + display.vv("Skipping callback '%s', as we already have a stdout callback." % (callback_name)) + continue + stdout_callback_loaded = True + elif callback_name == 'tree' and self._run_tree: + # TODO: remove special case for tree, which is an adhoc cli option --tree + pass + elif not self._run_additional_callbacks or (callback_needs_enabled and ( + # only run if not adhoc, or adhoc was specifically configured to run + check enabled list + C.CALLBACKS_ENABLED is None or callback_name not in C.CALLBACKS_ENABLED)): + # 2.x plugins shipped with ansible should require enabling, older or non shipped should load automatically + continue + + try: + callback_obj = callback_plugin() + # avoid bad plugin not returning an object, only needed cause we do class_only load and bypass loader checks, + # really a bug in the plugin itself which we ignore as callback errors are not supposed to be fatal. + if callback_obj: + # skip initializing if we already did the work for the same plugin (even with diff names) + if callback_obj not in self._callback_plugins: + callback_obj.set_options() + self._callback_plugins.append(callback_obj) + else: + display.vv("Skipping callback '%s', already loaded as '%s'." % (callback_plugin, callback_name)) + else: + display.warning("Skipping callback '%s', as it does not create a valid plugin instance." % callback_name) + continue + except Exception as e: + display.warning("Skipping callback '%s', unable to load due to: %s" % (callback_name, to_native(e))) + continue + + self._callbacks_loaded = True + + def run(self, play): + ''' + Iterates over the roles/tasks in a play, using the given (or default) + strategy for queueing tasks. The default is the linear strategy, which + operates like classic Ansible by keeping all hosts in lock-step with + a given task (meaning no hosts move on to the next task until all hosts + are done with the current task). + ''' + + if not self._callbacks_loaded: + self.load_callbacks() + + all_vars = self._variable_manager.get_vars(play=play) + templar = Templar(loader=self._loader, variables=all_vars) + warn_if_reserved(all_vars, templar.environment.globals.keys()) + + new_play = play.copy() + new_play.post_validate(templar) + new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers + + self.hostvars = HostVars( + inventory=self._inventory, + variable_manager=self._variable_manager, + loader=self._loader, + ) + + play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno()) + if (self._stdout_callback and + hasattr(self._stdout_callback, 'set_play_context')): + self._stdout_callback.set_play_context(play_context) + + for callback_plugin in self._callback_plugins: + if hasattr(callback_plugin, 'set_play_context'): + callback_plugin.set_play_context(play_context) + + self.send_callback('v2_playbook_on_play_start', new_play) + + # build the iterator + iterator = PlayIterator( + inventory=self._inventory, + play=new_play, + play_context=play_context, + variable_manager=self._variable_manager, + all_vars=all_vars, + start_at_done=self._start_at_done, + ) + + # adjust to # of workers to configured forks or size of batch, whatever is lower + self._initialize_processes(min(self._forks, iterator.batch_size)) + + # load the specified strategy (or the default linear one) + strategy = strategy_loader.get(new_play.strategy, self) + if strategy is None: + raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) + + # Because the TQM may survive multiple play runs, we start by marking + # any hosts as failed in the iterator here which may have been marked + # as failed in previous runs. Then we clear the internal list of failed + # hosts so we know what failed this round. + for host_name in self._failed_hosts.keys(): + host = self._inventory.get_host(host_name) + iterator.mark_host_failed(host) + for host_name in self._unreachable_hosts.keys(): + iterator._play._removed_hosts.append(host_name) + + self.clear_failed_hosts() + + # during initialization, the PlayContext will clear the start_at_task + # field to signal that a matching task was found, so check that here + # and remember it so we don't try to skip tasks on future plays + if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None: + self._start_at_done = True + + # and run the play using the strategy and cleanup on way out + try: + play_return = strategy.run(iterator, play_context) + finally: + strategy.cleanup() + self._cleanup_processes() + + # now re-save the hosts that failed from the iterator to our internal list + for host_name in iterator.get_failed_hosts(): + self._failed_hosts[host_name] = True + + if iterator.end_play: + raise AnsibleEndPlay(play_return) + + return play_return + + def cleanup(self): + display.debug("RUNNING CLEANUP") + self.terminate() + self._final_q.close() + self._cleanup_processes() + # We no longer flush on every write in ``Display.display`` + # just ensure we've flushed during cleanup + sys.stdout.flush() + sys.stderr.flush() + + def _cleanup_processes(self): + if hasattr(self, '_workers'): + for attempts_remaining in range(C.WORKER_SHUTDOWN_POLL_COUNT - 1, -1, -1): + if not any(worker_prc and worker_prc.is_alive() for worker_prc in self._workers): + break + + if attempts_remaining: + time.sleep(C.WORKER_SHUTDOWN_POLL_DELAY) + else: + display.warning('One or more worker processes are still running and will be terminated.') + + for worker_prc in self._workers: + if worker_prc and worker_prc.is_alive(): + try: + worker_prc.terminate() + except AttributeError: + pass + + def clear_failed_hosts(self): + self._failed_hosts = dict() + + def get_inventory(self): + return self._inventory + + def get_variable_manager(self): + return self._variable_manager + + def get_loader(self): + return self._loader + + def get_workers(self): + return self._workers[:] + + def terminate(self): + self._terminated = True + + def has_dead_workers(self): + + # [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>, + # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])> + + defunct = False + for x in self._workers: + if getattr(x, 'exitcode', None): + defunct = True + return defunct + + @lock_decorator(attr='_callback_lock') + def send_callback(self, method_name, *args, **kwargs): + for callback_plugin in [self._stdout_callback] + self._callback_plugins: + # a plugin that set self.disabled to True will not be called + # see osx_say.py example for such a plugin + if getattr(callback_plugin, 'disabled', False): + continue + + # a plugin can opt in to implicit tasks (such as meta). It does this + # by declaring self.wants_implicit_tasks = True. + wants_implicit_tasks = getattr(callback_plugin, 'wants_implicit_tasks', False) + + # try to find v2 method, fallback to v1 method, ignore callback if no method found + methods = [] + for possible in [method_name, 'v2_on_any']: + gotit = getattr(callback_plugin, possible, None) + if gotit is None: + gotit = getattr(callback_plugin, possible.removeprefix('v2_'), None) + if gotit is not None: + methods.append(gotit) + + # send clean copies + new_args = [] + + # If we end up being given an implicit task, we'll set this flag in + # the loop below. If the plugin doesn't care about those, then we + # check and continue to the next iteration of the outer loop. + is_implicit_task = False + + for arg in args: + # FIXME: add play/task cleaners + if isinstance(arg, TaskResult): + new_args.append(arg.clean_copy()) + # elif isinstance(arg, Play): + # elif isinstance(arg, Task): + else: + new_args.append(arg) + + if isinstance(arg, Task) and arg.implicit: + is_implicit_task = True + + if is_implicit_task and not wants_implicit_tasks: + continue + + for method in methods: + try: + method(*new_args, **kwargs) + except Exception as e: + # TODO: add config toggle to make this fatal or not? + display.warning(u"Failure using method (%s) in callback plugin (%s): %s" % (to_text(method_name), to_text(callback_plugin), to_text(e))) + from traceback import format_tb + from sys import exc_info + display.vvv('Callback Exception: \n' + ' '.join(format_tb(exc_info()[2]))) diff --git a/lib/ansible/executor/task_result.py b/lib/ansible/executor/task_result.py new file mode 100644 index 0000000..543b860 --- /dev/null +++ b/lib/ansible/executor/task_result.py @@ -0,0 +1,154 @@ +# Copyright: (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> + +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible import constants as C +from ansible.parsing.dataloader import DataLoader +from ansible.vars.clean import module_response_deepcopy, strip_internal_keys + +_IGNORE = ('failed', 'skipped') +_PRESERVE = ('attempts', 'changed', 'retries') +_SUB_PRESERVE = {'_ansible_delegated_vars': ('ansible_host', 'ansible_port', 'ansible_user', 'ansible_connection')} + +# stuff callbacks need +CLEAN_EXCEPTIONS = ( + '_ansible_verbose_always', # for debug and other actions, to always expand data (pretty jsonification) + '_ansible_item_label', # to know actual 'item' variable + '_ansible_no_log', # jic we didnt clean up well enough, DON'T LOG + '_ansible_verbose_override', # controls display of ansible_facts, gathering would be very noise with -v otherwise +) + + +class TaskResult: + ''' + This class is responsible for interpreting the resulting data + from an executed task, and provides helper methods for determining + the result of a given task. + ''' + + def __init__(self, host, task, return_data, task_fields=None): + self._host = host + self._task = task + + if isinstance(return_data, dict): + self._result = return_data.copy() + else: + self._result = DataLoader().load(return_data) + + if task_fields is None: + self._task_fields = dict() + else: + self._task_fields = task_fields + + @property + def task_name(self): + return self._task_fields.get('name', None) or self._task.get_name() + + def is_changed(self): + return self._check_key('changed') + + def is_skipped(self): + # loop results + if 'results' in self._result: + results = self._result['results'] + # Loop tasks are only considered skipped if all items were skipped. + # some squashed results (eg, yum) are not dicts and can't be skipped individually + if results and all(isinstance(res, dict) and res.get('skipped', False) for res in results): + return True + + # regular tasks and squashed non-dict results + return self._result.get('skipped', False) + + def is_failed(self): + if 'failed_when_result' in self._result or \ + 'results' in self._result and True in [True for x in self._result['results'] if 'failed_when_result' in x]: + return self._check_key('failed_when_result') + else: + return self._check_key('failed') + + def is_unreachable(self): + return self._check_key('unreachable') + + def needs_debugger(self, globally_enabled=False): + _debugger = self._task_fields.get('debugger') + _ignore_errors = C.TASK_DEBUGGER_IGNORE_ERRORS and self._task_fields.get('ignore_errors') + + ret = False + if globally_enabled and ((self.is_failed() and not _ignore_errors) or self.is_unreachable()): + ret = True + + if _debugger in ('always',): + ret = True + elif _debugger in ('never',): + ret = False + elif _debugger in ('on_failed',) and self.is_failed() and not _ignore_errors: + ret = True + elif _debugger in ('on_unreachable',) and self.is_unreachable(): + ret = True + elif _debugger in ('on_skipped',) and self.is_skipped(): + ret = True + + return ret + + def _check_key(self, key): + '''get a specific key from the result or its items''' + + if isinstance(self._result, dict) and key in self._result: + return self._result.get(key, False) + else: + flag = False + for res in self._result.get('results', []): + if isinstance(res, dict): + flag |= res.get(key, False) + return flag + + def clean_copy(self): + + ''' returns 'clean' taskresult object ''' + + # FIXME: clean task_fields, _task and _host copies + result = TaskResult(self._host, self._task, {}, self._task_fields) + + # statuses are already reflected on the event type + if result._task and result._task.action in C._ACTION_DEBUG: + # debug is verbose by default to display vars, no need to add invocation + ignore = _IGNORE + ('invocation',) + else: + ignore = _IGNORE + + subset = {} + # preserve subset for later + for sub in _SUB_PRESERVE: + if sub in self._result: + subset[sub] = {} + for key in _SUB_PRESERVE[sub]: + if key in self._result[sub]: + subset[sub][key] = self._result[sub][key] + + if isinstance(self._task.no_log, bool) and self._task.no_log or self._result.get('_ansible_no_log', False): + x = {"censored": "the output has been hidden due to the fact that 'no_log: true' was specified for this result"} + + # preserve full + for preserve in _PRESERVE: + if preserve in self._result: + x[preserve] = self._result[preserve] + + result._result = x + elif self._result: + result._result = module_response_deepcopy(self._result) + + # actualy remove + for remove_key in ignore: + if remove_key in result._result: + del result._result[remove_key] + + # remove almost ALL internal keys, keep ones relevant to callback + strip_internal_keys(result._result, exceptions=CLEAN_EXCEPTIONS) + + # keep subset + result._result.update(subset) + + return result |