diff options
Diffstat (limited to 'hacking/test-module.py')
-rwxr-xr-x | hacking/test-module.py | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/hacking/test-module.py b/hacking/test-module.py new file mode 100755 index 0000000..54343e0 --- /dev/null +++ b/hacking/test-module.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python + +# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. +# + +# this script is for testing modules without running through the +# entire guts of ansible, and is very helpful for when developing +# modules +# +# example: +# ./hacking/test-module.py -m lib/ansible/modules/command.py -a "/bin/sleep 3" +# ./hacking/test-module.py -m lib/ansible/modules/command.py -a "/bin/sleep 3" --debugger /usr/bin/pdb +# ./hacking/test-module.py -m lib/ansible/modules/lineinfile.py -a "dest=/etc/exports line='/srv/home hostname1(rw,sync)'" --check +# ./hacking/test-module.py -m lib/ansible/modules/command.py -a "echo hello" -n -o "test_hello" + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import glob +import optparse +import os +import subprocess +import sys +import traceback +import shutil + +from ansible.release import __version__ +import ansible.utils.vars as utils_vars +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.utils.jsonify import jsonify +from ansible.parsing.splitter import parse_kv +from ansible.executor import module_common +import ansible.constants as C +from ansible.module_utils._text import to_native, to_text +from ansible.template import Templar + +import json + + +def parse(): + """parse command line + + :return : (options, args)""" + parser = optparse.OptionParser() + + parser.usage = "%prog -[options] (-h for help)" + + parser.add_option('-m', '--module-path', dest='module_path', + help="REQUIRED: full path of module source to execute") + parser.add_option('-a', '--args', dest='module_args', default="", + help="module argument string") + parser.add_option('-D', '--debugger', dest='debugger', + help="path to python debugger (e.g. /usr/bin/pdb)") + parser.add_option('-I', '--interpreter', dest='interpreter', + help="path to interpreter to use for this module" + " (e.g. ansible_python_interpreter=/usr/bin/python)", + metavar='INTERPRETER_TYPE=INTERPRETER_PATH', + default="ansible_python_interpreter=%s" % + (sys.executable if sys.executable else '/usr/bin/python')) + parser.add_option('-c', '--check', dest='check', action='store_true', + help="run the module in check mode") + parser.add_option('-n', '--noexecute', dest='execute', action='store_false', + default=True, help="do not run the resulting module") + parser.add_option('-o', '--output', dest='filename', + help="Filename for resulting module", + default="~/.ansible_module_generated") + options, args = parser.parse_args() + if not options.module_path: + parser.print_help() + sys.exit(1) + else: + return options, args + + +def write_argsfile(argstring, json=False): + """ Write args to a file for old-style module's use. """ + argspath = os.path.expanduser("~/.ansible_test_module_arguments") + argsfile = open(argspath, 'w') + if json: + args = parse_kv(argstring) + argstring = jsonify(args) + argsfile.write(argstring) + argsfile.close() + return argspath + + +def get_interpreters(interpreter): + result = dict() + if interpreter: + if '=' not in interpreter: + print("interpreter must by in the form of ansible_python_interpreter=/usr/bin/python") + sys.exit(1) + interpreter_type, interpreter_path = interpreter.split('=') + if not interpreter_type.startswith('ansible_'): + interpreter_type = 'ansible_%s' % interpreter_type + if not interpreter_type.endswith('_interpreter'): + interpreter_type = '%s_interpreter' % interpreter_type + result[interpreter_type] = interpreter_path + return result + + +def boilerplate_module(modfile, args, interpreters, check, destfile): + """ simulate what ansible does with new style modules """ + + # module_fh = open(modfile) + # module_data = module_fh.read() + # module_fh.close() + + # replacer = module_common.ModuleReplacer() + loader = DataLoader() + + # included_boilerplate = module_data.find(module_common.REPLACER) != -1 or module_data.find("import ansible.module_utils") != -1 + + complex_args = {} + + # default selinux fs list is pass in as _ansible_selinux_special_fs arg + complex_args['_ansible_selinux_special_fs'] = C.DEFAULT_SELINUX_SPECIAL_FS + complex_args['_ansible_tmpdir'] = C.DEFAULT_LOCAL_TMP + complex_args['_ansible_keep_remote_files'] = C.DEFAULT_KEEP_REMOTE_FILES + complex_args['_ansible_version'] = __version__ + + if args.startswith("@"): + # Argument is a YAML file (JSON is a subset of YAML) + complex_args = utils_vars.combine_vars(complex_args, loader.load_from_file(args[1:])) + args = '' + elif args.startswith("{"): + # Argument is a YAML document (not a file) + complex_args = utils_vars.combine_vars(complex_args, loader.load(args)) + args = '' + + if args: + parsed_args = parse_kv(args) + complex_args = utils_vars.combine_vars(complex_args, parsed_args) + + task_vars = interpreters + + if check: + complex_args['_ansible_check_mode'] = True + + modname = os.path.basename(modfile) + modname = os.path.splitext(modname)[0] + (module_data, module_style, shebang) = module_common.modify_module( + modname, + modfile, + complex_args, + Templar(loader=loader), + task_vars=task_vars + ) + + if module_style == 'new' and '_ANSIBALLZ_WRAPPER = True' in to_native(module_data): + module_style = 'ansiballz' + + modfile2_path = os.path.expanduser(destfile) + print("* including generated source, if any, saving to: %s" % modfile2_path) + if module_style not in ('ansiballz', 'old'): + print("* this may offset any line numbers in tracebacks/debuggers!") + modfile2 = open(modfile2_path, 'wb') + modfile2.write(module_data) + modfile2.close() + modfile = modfile2_path + + return (modfile2_path, modname, module_style) + + +def ansiballz_setup(modfile, modname, interpreters): + os.system("chmod +x %s" % modfile) + + if 'ansible_python_interpreter' in interpreters: + command = [interpreters['ansible_python_interpreter']] + else: + command = [] + command.extend([modfile, 'explode']) + + cmd = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = cmd.communicate() + out, err = to_text(out, errors='surrogate_or_strict'), to_text(err) + lines = out.splitlines() + if len(lines) != 2 or 'Module expanded into' not in lines[0]: + print("*" * 35) + print("INVALID OUTPUT FROM ANSIBALLZ MODULE WRAPPER") + print(out) + sys.exit(err) + debug_dir = lines[1].strip() + + # All the directories in an AnsiBallZ that modules can live + core_dirs = glob.glob(os.path.join(debug_dir, 'ansible/modules')) + collection_dirs = glob.glob(os.path.join(debug_dir, 'ansible_collections/*/*/plugins/modules')) + + # There's only one module in an AnsiBallZ payload so look for the first module and then exit + for module_dir in core_dirs + collection_dirs: + for dirname, directories, filenames in os.walk(module_dir): + for filename in filenames: + if filename == modname + '.py': + modfile = os.path.join(dirname, filename) + break + + argsfile = os.path.join(debug_dir, 'args') + + print("* ansiballz module detected; extracted module source to: %s" % debug_dir) + return modfile, argsfile + + +def runtest(modfile, argspath, modname, module_style, interpreters): + """Test run a module, piping it's output for reporting.""" + invoke = "" + if module_style == 'ansiballz': + modfile, argspath = ansiballz_setup(modfile, modname, interpreters) + if 'ansible_python_interpreter' in interpreters: + invoke = "%s " % interpreters['ansible_python_interpreter'] + + os.system("chmod +x %s" % modfile) + + invoke = "%s%s" % (invoke, modfile) + if argspath is not None: + invoke = "%s %s" % (invoke, argspath) + + cmd = subprocess.Popen(invoke, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (out, err) = cmd.communicate() + out, err = to_text(out), to_text(err) + + try: + print("*" * 35) + print("RAW OUTPUT") + print(out) + print(err) + results = json.loads(out) + except Exception: + print("*" * 35) + print("INVALID OUTPUT FORMAT") + print(out) + traceback.print_exc() + sys.exit(1) + + print("*" * 35) + print("PARSED OUTPUT") + print(jsonify(results, format=True)) + + +def rundebug(debugger, modfile, argspath, modname, module_style, interpreters): + """Run interactively with console debugger.""" + + if module_style == 'ansiballz': + modfile, argspath = ansiballz_setup(modfile, modname, interpreters) + + if argspath is not None: + subprocess.call("%s %s %s" % (debugger, modfile, argspath), shell=True) + else: + subprocess.call("%s %s" % (debugger, modfile), shell=True) + + +def main(): + + options, args = parse() + interpreters = get_interpreters(options.interpreter) + (modfile, modname, module_style) = boilerplate_module(options.module_path, options.module_args, interpreters, options.check, options.filename) + + argspath = None + if module_style not in ('new', 'ansiballz'): + if module_style in ('non_native_want_json', 'binary'): + argspath = write_argsfile(options.module_args, json=True) + elif module_style == 'old': + argspath = write_argsfile(options.module_args, json=False) + else: + raise Exception("internal error, unexpected module style: %s" % module_style) + + if options.execute: + if options.debugger: + rundebug(options.debugger, modfile, argspath, modname, module_style, interpreters) + else: + runtest(modfile, argspath, modname, module_style, interpreters) + + +if __name__ == "__main__": + try: + main() + finally: + shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) |