diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:04:21 +0000 |
commit | 8a754e0858d922e955e71b253c139e071ecec432 (patch) | |
tree | 527d16e74bfd1840c85efd675fdecad056c54107 /lib/ansible/plugins/become/__init__.py | |
parent | Initial commit. (diff) | |
download | ansible-core-upstream.tar.xz ansible-core-upstream.zip |
Adding upstream version 2.14.3.upstream/2.14.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/plugins/become/__init__.py')
-rw-r--r-- | lib/ansible/plugins/become/__init__.py | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/lib/ansible/plugins/become/__init__.py b/lib/ansible/plugins/become/__init__.py new file mode 100644 index 0000000..9dacf22 --- /dev/null +++ b/lib/ansible/plugins/become/__init__.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2018, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import shlex + +from abc import abstractmethod +from random import choice +from string import ascii_lowercase +from gettext import dgettext + +from ansible.errors import AnsibleError +from ansible.module_utils._text import to_bytes +from ansible.plugins import AnsiblePlugin + + +def _gen_id(length=32): + ''' return random string used to identify the current privilege escalation ''' + return ''.join(choice(ascii_lowercase) for x in range(length)) + + +class BecomeBase(AnsiblePlugin): + + name = None # type: str | None + + # messages for detecting prompted password issues + fail = tuple() # type: tuple[str, ...] + missing = tuple() # type: tuple[str, ...] + + # many connection plugins cannot provide tty, set to True if your become + # plugin requires a tty, i.e su + require_tty = False + + # prompt to match + prompt = '' + + def __init__(self): + super(BecomeBase, self).__init__() + self._id = '' + self.success = '' + + def get_option(self, option, hostvars=None, playcontext=None): + """ Overrides the base get_option to provide a fallback to playcontext vars in case a 3rd party plugin did not + implement the base become options required in Ansible. """ + # TODO: add deprecation warning for ValueError in devel that removes the playcontext fallback + try: + return super(BecomeBase, self).get_option(option, hostvars=hostvars) + except KeyError: + pc_fallback = ['become_user', 'become_pass', 'become_flags', 'become_exe'] + if option not in pc_fallback: + raise + + return getattr(playcontext, option, None) + + def expect_prompt(self): + """This function assists connection plugins in determining if they need to wait for + a prompt. Both a prompt and a password are required. + """ + return self.prompt and self.get_option('become_pass') + + def _build_success_command(self, cmd, shell, noexe=False): + if not all((cmd, shell, self.success)): + return cmd + + try: + cmd = shlex.quote('%s %s %s %s' % (shell.ECHO, self.success, shell.COMMAND_SEP, cmd)) + except AttributeError: + # TODO: This should probably become some more robust functionality used to detect incompat + raise AnsibleError('The %s shell family is incompatible with the %s become plugin' % (shell.SHELL_FAMILY, self.name)) + exe = getattr(shell, 'executable', None) + if exe and not noexe: + cmd = '%s -c %s' % (exe, cmd) + return cmd + + @abstractmethod + def build_become_command(self, cmd, shell): + self._id = _gen_id() + self.success = 'BECOME-SUCCESS-%s' % self._id + + def check_success(self, b_output): + b_success = to_bytes(self.success) + return any(b_success in l.rstrip() for l in b_output.splitlines(True)) + + def check_password_prompt(self, b_output): + ''' checks if the expected password prompt exists in b_output ''' + if self.prompt: + b_prompt = to_bytes(self.prompt).strip() + return any(l.strip().startswith(b_prompt) for l in b_output.splitlines()) + return False + + def _check_password_error(self, b_out, msg): + ''' returns True/False if domain specific i18n version of msg is found in b_out ''' + b_fail = to_bytes(dgettext(self.name, msg)) + return b_fail and b_fail in b_out + + def check_incorrect_password(self, b_output): + for errstring in self.fail: + if self._check_password_error(b_output, errstring): + return True + return False + + def check_missing_password(self, b_output): + for errstring in self.missing: + if self._check_password_error(b_output, errstring): + return True + return False |