summaryrefslogtreecommitdiffstats
path: root/lib/ansible/cli/console.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:05:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:05:48 +0000
commitab76d0c3dcea928a1f252ce827027aca834213cd (patch)
tree7e3797bdd2403982f4a351608d9633c910aadc12 /lib/ansible/cli/console.py
parentInitial commit. (diff)
downloadansible-core-ab76d0c3dcea928a1f252ce827027aca834213cd.tar.xz
ansible-core-ab76d0c3dcea928a1f252ce827027aca834213cd.zip
Adding upstream version 2.14.13.upstream/2.14.13
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/cli/console.py')
-rwxr-xr-xlib/ansible/cli/console.py604
1 files changed, 604 insertions, 0 deletions
diff --git a/lib/ansible/cli/console.py b/lib/ansible/cli/console.py
new file mode 100755
index 0000000..3125cc4
--- /dev/null
+++ b/lib/ansible/cli/console.py
@@ -0,0 +1,604 @@
+#!/usr/bin/env python
+# Copyright: (c) 2014, Nandor Sivok <dominis@haxor.hu>
+# Copyright: (c) 2016, Redhat Inc
+# Copyright: (c) 2018, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# PYTHON_ARGCOMPLETE_OK
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+# ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first
+from ansible.cli import CLI
+
+import atexit
+import cmd
+import getpass
+import readline
+import os
+import sys
+
+from ansible import constants as C
+from ansible import context
+from ansible.cli.arguments import option_helpers as opt_help
+from ansible.executor.task_queue_manager import TaskQueueManager
+from ansible.module_utils._text import to_native, to_text
+from ansible.module_utils.parsing.convert_bool import boolean
+from ansible.parsing.splitter import parse_kv
+from ansible.playbook.play import Play
+from ansible.plugins.list import list_plugins
+from ansible.plugins.loader import module_loader, fragment_loader
+from ansible.utils import plugin_docs
+from ansible.utils.color import stringc
+from ansible.utils.display import Display
+
+display = Display()
+
+
+class ConsoleCLI(CLI, cmd.Cmd):
+ '''
+ A REPL that allows for running ad-hoc tasks against a chosen inventory
+ from a nice shell with built-in tab completion (based on dominis'
+ ansible-shell).
+
+ It supports several commands, and you can modify its configuration at
+ runtime:
+
+ - `cd [pattern]`: change host/group (you can use host patterns eg.: app*.dc*:!app01*)
+ - `list`: list available hosts in the current path
+ - `list groups`: list groups included in the current path
+ - `become`: toggle the become flag
+ - `!`: forces shell module instead of the ansible module (!yum update -y)
+ - `verbosity [num]`: set the verbosity level
+ - `forks [num]`: set the number of forks
+ - `become_user [user]`: set the become_user
+ - `remote_user [user]`: set the remote_user
+ - `become_method [method]`: set the privilege escalation method
+ - `check [bool]`: toggle check mode
+ - `diff [bool]`: toggle diff mode
+ - `timeout [integer]`: set the timeout of tasks in seconds (0 to disable)
+ - `help [command/module]`: display documentation for the command or module
+ - `exit`: exit ansible-console
+ '''
+
+ name = 'ansible-console'
+ modules = [] # type: list[str] | None
+ ARGUMENTS = {'host-pattern': 'A name of a group in the inventory, a shell-like glob '
+ 'selecting hosts in inventory or any combination of the two separated by commas.'}
+
+ # use specific to console, but fallback to highlight for backwards compatibility
+ NORMAL_PROMPT = C.COLOR_CONSOLE_PROMPT or C.COLOR_HIGHLIGHT
+
+ def __init__(self, args):
+
+ super(ConsoleCLI, self).__init__(args)
+
+ self.intro = 'Welcome to the ansible console. Type help or ? to list commands.\n'
+
+ self.groups = []
+ self.hosts = []
+ self.pattern = None
+ self.variable_manager = None
+ self.loader = None
+ self.passwords = dict()
+
+ self.cwd = '*'
+
+ # Defaults for these are set from the CLI in run()
+ self.remote_user = None
+ self.become = None
+ self.become_user = None
+ self.become_method = None
+ self.check_mode = None
+ self.diff = None
+ self.forks = None
+ self.task_timeout = None
+ self.collections = None
+
+ cmd.Cmd.__init__(self)
+
+ def init_parser(self):
+ super(ConsoleCLI, self).init_parser(
+ desc="REPL console for executing Ansible tasks.",
+ epilog="This is not a live session/connection: each task is executed in the background and returns its results."
+ )
+ opt_help.add_runas_options(self.parser)
+ opt_help.add_inventory_options(self.parser)
+ opt_help.add_connect_options(self.parser)
+ opt_help.add_check_options(self.parser)
+ opt_help.add_vault_options(self.parser)
+ opt_help.add_fork_options(self.parser)
+ opt_help.add_module_options(self.parser)
+ opt_help.add_basedir_options(self.parser)
+ opt_help.add_runtask_options(self.parser)
+ opt_help.add_tasknoplay_options(self.parser)
+
+ # options unique to shell
+ self.parser.add_argument('pattern', help='host pattern', metavar='pattern', default='all', nargs='?')
+ self.parser.add_argument('--step', dest='step', action='store_true',
+ help="one-step-at-a-time: confirm each task before running")
+
+ def post_process_args(self, options):
+ options = super(ConsoleCLI, self).post_process_args(options)
+ display.verbosity = options.verbosity
+ self.validate_conflicts(options, runas_opts=True, fork_opts=True)
+ return options
+
+ def get_names(self):
+ return dir(self)
+
+ def cmdloop(self):
+ try:
+ cmd.Cmd.cmdloop(self)
+
+ except KeyboardInterrupt:
+ self.cmdloop()
+
+ except EOFError:
+ self.display("[Ansible-console was exited]")
+ self.do_exit(self)
+
+ def set_prompt(self):
+ login_user = self.remote_user or getpass.getuser()
+ self.selected = self.inventory.list_hosts(self.cwd)
+ prompt = "%s@%s (%d)[f:%s]" % (login_user, self.cwd, len(self.selected), self.forks)
+ if self.become and self.become_user in [None, 'root']:
+ prompt += "# "
+ color = C.COLOR_ERROR
+ else:
+ prompt += "$ "
+ color = self.NORMAL_PROMPT
+ self.prompt = stringc(prompt, color, wrap_nonvisible_chars=True)
+
+ def list_modules(self):
+ return list_plugins('module', self.collections)
+
+ def default(self, line, forceshell=False):
+ """ actually runs modules """
+ if line.startswith("#"):
+ return False
+
+ if not self.cwd:
+ display.error("No host found")
+ return False
+
+ # defaults
+ module = 'shell'
+ module_args = line
+
+ if forceshell is not True:
+ possible_module, *possible_args = line.split()
+ if module_loader.find_plugin(possible_module):
+ # we found module!
+ module = possible_module
+ if possible_args:
+ module_args = ' '.join(possible_args)
+ else:
+ module_args = ''
+
+ if self.callback:
+ cb = self.callback
+ elif C.DEFAULT_LOAD_CALLBACK_PLUGINS and C.DEFAULT_STDOUT_CALLBACK != 'default':
+ cb = C.DEFAULT_STDOUT_CALLBACK
+ else:
+ cb = 'minimal'
+
+ result = None
+ try:
+ check_raw = module in C._ACTION_ALLOWS_RAW_ARGS
+ task = dict(action=dict(module=module, args=parse_kv(module_args, check_raw=check_raw)), timeout=self.task_timeout)
+ play_ds = dict(
+ name="Ansible Shell",
+ hosts=self.cwd,
+ gather_facts='no',
+ tasks=[task],
+ remote_user=self.remote_user,
+ become=self.become,
+ become_user=self.become_user,
+ become_method=self.become_method,
+ check_mode=self.check_mode,
+ diff=self.diff,
+ collections=self.collections,
+ )
+ play = Play().load(play_ds, variable_manager=self.variable_manager, loader=self.loader)
+ except Exception as e:
+ display.error(u"Unable to build command: %s" % to_text(e))
+ return False
+
+ try:
+ # now create a task queue manager to execute the play
+ self._tqm = None
+ try:
+ self._tqm = TaskQueueManager(
+ inventory=self.inventory,
+ variable_manager=self.variable_manager,
+ loader=self.loader,
+ passwords=self.passwords,
+ stdout_callback=cb,
+ run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
+ run_tree=False,
+ forks=self.forks,
+ )
+
+ result = self._tqm.run(play)
+ display.debug(result)
+ finally:
+ if self._tqm:
+ self._tqm.cleanup()
+ if self.loader:
+ self.loader.cleanup_all_tmp_files()
+
+ if result is None:
+ display.error("No hosts found")
+ return False
+ except KeyboardInterrupt:
+ display.error('User interrupted execution')
+ return False
+ except Exception as e:
+ if self.verbosity >= 3:
+ import traceback
+ display.v(traceback.format_exc())
+ display.error(to_text(e))
+ return False
+
+ def emptyline(self):
+ return
+
+ def do_shell(self, arg):
+ """
+ You can run shell commands through the shell module.
+
+ eg.:
+ shell ps uax | grep java | wc -l
+ shell killall python
+ shell halt -n
+
+ You can use the ! to force the shell module. eg.:
+ !ps aux | grep java | wc -l
+ """
+ self.default(arg, True)
+
+ def help_shell(self):
+ display.display("You can run shell commands through the shell module.")
+
+ def do_forks(self, arg):
+ """Set the number of forks"""
+ if arg:
+ try:
+ forks = int(arg)
+ except TypeError:
+ display.error('Invalid argument for "forks"')
+ self.usage_forks()
+
+ if forks > 0:
+ self.forks = forks
+ self.set_prompt()
+
+ else:
+ display.display('forks must be greater than or equal to 1')
+ else:
+ self.usage_forks()
+
+ def help_forks(self):
+ display.display("Set the number of forks to use per task")
+ self.usage_forks()
+
+ def usage_forks(self):
+ display.display('Usage: forks <number>')
+
+ do_serial = do_forks
+ help_serial = help_forks
+
+ def do_collections(self, arg):
+ """Set list of collections for 'short name' usage"""
+ if arg in ('', 'none'):
+ self.collections = None
+ elif not arg:
+ self.usage_collections()
+ else:
+ collections = arg.split(',')
+ for collection in collections:
+ if self.collections is None:
+ self.collections = []
+ self.collections.append(collection.strip())
+
+ if self.collections:
+ display.v('Collections name search is set to: %s' % ', '.join(self.collections))
+ else:
+ display.v('Collections name search is using defaults')
+
+ def help_collections(self):
+ display.display("Set the collection name search path when using short names for plugins")
+ self.usage_collections()
+
+ def usage_collections(self):
+ display.display('Usage: collections <collection1>[, <collection2> ...]\n Use empty quotes or "none" to reset to default.\n')
+
+ def do_verbosity(self, arg):
+ """Set verbosity level"""
+ if not arg:
+ display.display('Usage: verbosity <number>')
+ else:
+ try:
+ display.verbosity = int(arg)
+ display.v('verbosity level set to %s' % arg)
+ except (TypeError, ValueError) as e:
+ display.error('The verbosity must be a valid integer: %s' % to_text(e))
+
+ def help_verbosity(self):
+ display.display("Set the verbosity level, equivalent to -v for 1 and -vvvv for 4.")
+
+ def do_cd(self, arg):
+ """
+ Change active host/group. You can use hosts patterns as well eg.:
+ cd webservers
+ cd webservers:dbservers
+ cd webservers:!phoenix
+ cd webservers:&staging
+ cd webservers:dbservers:&staging:!phoenix
+ """
+ if not arg:
+ self.cwd = '*'
+ elif arg in '/*':
+ self.cwd = 'all'
+ elif self.inventory.get_hosts(arg):
+ self.cwd = arg
+ else:
+ display.display("no host matched")
+
+ self.set_prompt()
+
+ def help_cd(self):
+ display.display("Change active host/group. ")
+ self.usage_cd()
+
+ def usage_cd(self):
+ display.display("Usage: cd <group>|<host>|<host pattern>")
+
+ def do_list(self, arg):
+ """List the hosts in the current group"""
+ if not arg:
+ for host in self.selected:
+ display.display(host.name)
+ elif arg == 'groups':
+ for group in self.groups:
+ display.display(group)
+ else:
+ display.error('Invalid option passed to "list"')
+ self.help_list()
+
+ def help_list(self):
+ display.display("List the hosts in the current group or a list of groups if you add 'groups'.")
+
+ def do_become(self, arg):
+ """Toggle whether plays run with become"""
+ if arg:
+ self.become = boolean(arg, strict=False)
+ display.v("become changed to %s" % self.become)
+ self.set_prompt()
+ else:
+ display.display("Please specify become value, e.g. `become yes`")
+
+ def help_become(self):
+ display.display("Toggle whether the tasks are run with become")
+
+ def do_remote_user(self, arg):
+ """Given a username, set the remote user plays are run by"""
+ if arg:
+ self.remote_user = arg
+ self.set_prompt()
+ else:
+ display.display("Please specify a remote user, e.g. `remote_user root`")
+
+ def help_remote_user(self):
+ display.display("Set the user for use as login to the remote target")
+
+ def do_become_user(self, arg):
+ """Given a username, set the user that plays are run by when using become"""
+ if arg:
+ self.become_user = arg
+ else:
+ display.display("Please specify a user, e.g. `become_user jenkins`")
+ display.v("Current user is %s" % self.become_user)
+ self.set_prompt()
+
+ def help_become_user(self):
+ display.display("Set the user for use with privilege escalation (which remote user attempts to 'become' when become is enabled)")
+
+ def do_become_method(self, arg):
+ """Given a become_method, set the privilege escalation method when using become"""
+ if arg:
+ self.become_method = arg
+ display.v("become_method changed to %s" % self.become_method)
+ else:
+ display.display("Please specify a become_method, e.g. `become_method su`")
+ display.v("Current become_method is %s" % self.become_method)
+
+ def help_become_method(self):
+ display.display("Set the privilege escalation plugin to use when become is enabled")
+
+ def do_check(self, arg):
+ """Toggle whether plays run with check mode"""
+ if arg:
+ self.check_mode = boolean(arg, strict=False)
+ display.display("check mode changed to %s" % self.check_mode)
+ else:
+ display.display("Please specify check mode value, e.g. `check yes`")
+ display.v("check mode is currently %s." % self.check_mode)
+
+ def help_check(self):
+ display.display("Toggle check_mode for the tasks")
+
+ def do_diff(self, arg):
+ """Toggle whether plays run with diff"""
+ if arg:
+ self.diff = boolean(arg, strict=False)
+ display.display("diff mode changed to %s" % self.diff)
+ else:
+ display.display("Please specify a diff value , e.g. `diff yes`")
+ display.v("diff mode is currently %s" % self.diff)
+
+ def help_diff(self):
+ display.display("Toggle diff output for the tasks")
+
+ def do_timeout(self, arg):
+ """Set the timeout"""
+ if arg:
+ try:
+ timeout = int(arg)
+ if timeout < 0:
+ display.error('The timeout must be greater than or equal to 1, use 0 to disable')
+ else:
+ self.task_timeout = timeout
+ except (TypeError, ValueError) as e:
+ display.error('The timeout must be a valid positive integer, or 0 to disable: %s' % to_text(e))
+ else:
+ self.usage_timeout()
+
+ def help_timeout(self):
+ display.display("Set task timeout in seconds")
+ self.usage_timeout()
+
+ def usage_timeout(self):
+ display.display('Usage: timeout <seconds>')
+
+ def do_exit(self, args):
+ """Exits from the console"""
+ sys.stdout.write('\nAnsible-console was exited.\n')
+ return -1
+
+ def help_exit(self):
+ display.display("LEAVE!")
+
+ do_EOF = do_exit
+ help_EOF = help_exit
+
+ def helpdefault(self, module_name):
+ if module_name:
+ in_path = module_loader.find_plugin(module_name)
+ if in_path:
+ oc, a, _dummy1, _dummy2 = plugin_docs.get_docstring(in_path, fragment_loader)
+ if oc:
+ display.display(oc['short_description'])
+ display.display('Parameters:')
+ for opt in oc['options'].keys():
+ display.display(' ' + stringc(opt, self.NORMAL_PROMPT) + ' ' + oc['options'][opt]['description'][0])
+ else:
+ display.error('No documentation found for %s.' % module_name)
+ else:
+ display.error('%s is not a valid command, use ? to list all valid commands.' % module_name)
+
+ def help_help(self):
+ display.warning("Don't be redundant!")
+
+ def complete_cd(self, text, line, begidx, endidx):
+ mline = line.partition(' ')[2]
+ offs = len(mline) - len(text)
+
+ if self.cwd in ('all', '*', '\\'):
+ completions = self.hosts + self.groups
+ else:
+ completions = [x.name for x in self.inventory.list_hosts(self.cwd)]
+
+ return [to_native(s)[offs:] for s in completions if to_native(s).startswith(to_native(mline))]
+
+ def completedefault(self, text, line, begidx, endidx):
+ if line.split()[0] in self.list_modules():
+ mline = line.split(' ')[-1]
+ offs = len(mline) - len(text)
+ completions = self.module_args(line.split()[0])
+
+ return [s[offs:] + '=' for s in completions if s.startswith(mline)]
+
+ def module_args(self, module_name):
+ in_path = module_loader.find_plugin(module_name)
+ oc, a, _dummy1, _dummy2 = plugin_docs.get_docstring(in_path, fragment_loader, is_module=True)
+ return list(oc['options'].keys())
+
+ def run(self):
+
+ super(ConsoleCLI, self).run()
+
+ sshpass = None
+ becomepass = None
+
+ # hosts
+ self.pattern = context.CLIARGS['pattern']
+ self.cwd = self.pattern
+
+ # Defaults from the command line
+ self.remote_user = context.CLIARGS['remote_user']
+ self.become = context.CLIARGS['become']
+ self.become_user = context.CLIARGS['become_user']
+ self.become_method = context.CLIARGS['become_method']
+ self.check_mode = context.CLIARGS['check']
+ self.diff = context.CLIARGS['diff']
+ self.forks = context.CLIARGS['forks']
+ self.task_timeout = context.CLIARGS['task_timeout']
+
+ # set module path if needed
+ if context.CLIARGS['module_path']:
+ for path in context.CLIARGS['module_path']:
+ if path:
+ module_loader.add_directory(path)
+
+ # dynamically add 'cannonical' modules as commands, aliases coudld be used and dynamically loaded
+ self.modules = self.list_modules()
+ for module in self.modules:
+ setattr(self, 'do_' + module, lambda arg, module=module: self.default(module + ' ' + arg))
+ setattr(self, 'help_' + module, lambda module=module: self.helpdefault(module))
+
+ (sshpass, becomepass) = self.ask_passwords()
+ self.passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
+
+ self.loader, self.inventory, self.variable_manager = self._play_prereqs()
+
+ hosts = self.get_host_list(self.inventory, context.CLIARGS['subset'], self.pattern)
+
+ self.groups = self.inventory.list_groups()
+ self.hosts = [x.name for x in hosts]
+
+ # This hack is to work around readline issues on a mac:
+ # http://stackoverflow.com/a/7116997/541202
+ if 'libedit' in readline.__doc__:
+ readline.parse_and_bind("bind ^I rl_complete")
+ else:
+ readline.parse_and_bind("tab: complete")
+
+ histfile = os.path.join(os.path.expanduser("~"), ".ansible-console_history")
+ try:
+ readline.read_history_file(histfile)
+ except IOError:
+ pass
+
+ atexit.register(readline.write_history_file, histfile)
+ self.set_prompt()
+ self.cmdloop()
+
+ def __getattr__(self, name):
+ ''' handle not found to populate dynamically a module function if module matching name exists '''
+ attr = None
+
+ if name.startswith('do_'):
+ module = name.replace('do_', '')
+ if module_loader.find_plugin(module):
+ setattr(self, name, lambda arg, module=module: self.default(module + ' ' + arg))
+ attr = object.__getattr__(self, name)
+ elif name.startswith('help_'):
+ module = name.replace('help_', '')
+ if module_loader.find_plugin(module):
+ setattr(self, name, lambda module=module: self.helpdefault(module))
+ attr = object.__getattr__(self, name)
+
+ if attr is None:
+ raise AttributeError(f"{self.__class__} does not have a {name} attribute")
+
+ return attr
+
+
+def main(args=None):
+ ConsoleCLI.cli_executor(args)
+
+
+if __name__ == '__main__':
+ main()