summaryrefslogtreecommitdiffstats
path: root/lib/ansible/utils
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/utils')
-rw-r--r--lib/ansible/utils/_junit_xml.py2
-rw-r--r--lib/ansible/utils/cmd_functions.py2
-rw-r--r--lib/ansible/utils/collection_loader/_collection_finder.py167
-rw-r--r--lib/ansible/utils/display.py374
-rw-r--r--lib/ansible/utils/encrypt.py36
-rw-r--r--lib/ansible/utils/hashing.py2
-rw-r--r--lib/ansible/utils/jsonrpc.py2
-rw-r--r--lib/ansible/utils/path.py2
-rw-r--r--lib/ansible/utils/plugin_docs.py2
-rw-r--r--lib/ansible/utils/py3compat.py2
-rw-r--r--lib/ansible/utils/shlex.py12
-rw-r--r--lib/ansible/utils/ssh_functions.py9
-rw-r--r--lib/ansible/utils/unicode.py2
-rw-r--r--lib/ansible/utils/unsafe_proxy.py23
-rw-r--r--lib/ansible/utils/vars.py118
-rw-r--r--lib/ansible/utils/version.py2
16 files changed, 595 insertions, 162 deletions
diff --git a/lib/ansible/utils/_junit_xml.py b/lib/ansible/utils/_junit_xml.py
index 76c8878..8c4dba0 100644
--- a/lib/ansible/utils/_junit_xml.py
+++ b/lib/ansible/utils/_junit_xml.py
@@ -15,7 +15,7 @@ from xml.dom import minidom
from xml.etree import ElementTree as ET
-@dataclasses.dataclass # type: ignore[misc] # https://github.com/python/mypy/issues/5374
+@dataclasses.dataclass
class TestResult(metaclass=abc.ABCMeta):
"""Base class for the result of a test case."""
diff --git a/lib/ansible/utils/cmd_functions.py b/lib/ansible/utils/cmd_functions.py
index d4edb2f..436d955 100644
--- a/lib/ansible/utils/cmd_functions.py
+++ b/lib/ansible/utils/cmd_functions.py
@@ -24,7 +24,7 @@ import shlex
import subprocess
import sys
-from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common.text.converters import to_bytes
def run_cmd(cmd, live=False, readsize=10):
diff --git a/lib/ansible/utils/collection_loader/_collection_finder.py b/lib/ansible/utils/collection_loader/_collection_finder.py
index d3a8765..16d0bcc 100644
--- a/lib/ansible/utils/collection_loader/_collection_finder.py
+++ b/lib/ansible/utils/collection_loader/_collection_finder.py
@@ -7,6 +7,7 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
+import itertools
import os
import os.path
import pkgutil
@@ -39,7 +40,23 @@ except ImportError:
reload_module = reload # type: ignore[name-defined] # pylint:disable=undefined-variable
try:
- from importlib.util import spec_from_loader
+ try:
+ # Available on Python >= 3.11
+ # We ignore the import error that will trigger when running mypy with
+ # older Python versions.
+ from importlib.resources.abc import TraversableResources # type: ignore[import]
+ except ImportError:
+ # Used with Python 3.9 and 3.10 only
+ # This member is still available as an alias up until Python 3.14 but
+ # is deprecated as of Python 3.12.
+ from importlib.abc import TraversableResources # deprecated: description='TraversableResources move' python_version='3.10'
+except ImportError:
+ # Python < 3.9
+ # deprecated: description='TraversableResources fallback' python_version='3.8'
+ TraversableResources = object # type: ignore[assignment,misc]
+
+try:
+ from importlib.util import find_spec, spec_from_loader
except ImportError:
pass
@@ -50,6 +67,11 @@ except ImportError:
else:
HAS_FILE_FINDER = True
+try:
+ import pathlib
+except ImportError:
+ pass
+
# NB: this supports import sanity test providing a different impl
try:
from ._collection_meta import _meta_yml_to_dict
@@ -78,6 +100,141 @@ except AttributeError: # Python 2
PB_EXTENSIONS = ('.yml', '.yaml')
+SYNTHETIC_PACKAGE_NAME = '<ansible_synthetic_collection_package>'
+
+
+class _AnsibleNSTraversable:
+ """Class that implements the ``importlib.resources.abc.Traversable``
+ interface for the following ``ansible_collections`` namespace packages::
+
+ * ``ansible_collections``
+ * ``ansible_collections.<namespace>``
+
+ These namespace packages operate differently from a normal Python
+ namespace package, in that the same namespace can be distributed across
+ multiple directories on the filesystem and still function as a single
+ namespace, such as::
+
+ * ``/usr/share/ansible/collections/ansible_collections/ansible/posix/``
+ * ``/home/user/.ansible/collections/ansible_collections/ansible/windows/``
+
+ This class will mimic the behavior of various ``pathlib.Path`` methods,
+ by combining the results of multiple root paths into the output.
+
+ This class does not do anything to remove duplicate collections from the
+ list, so when traversing either namespace patterns supported by this class,
+ it is possible to have the same collection located in multiple root paths,
+ but precedence rules only use one. When iterating or traversing these
+ package roots, there is the potential to see the same collection in
+ multiple places without indication of which would be used. In such a
+ circumstance, it is best to then call ``importlib.resources.files`` for an
+ individual collection package rather than continuing to traverse from the
+ namespace package.
+
+ Several methods will raise ``NotImplementedError`` as they do not make
+ sense for these namespace packages.
+ """
+ def __init__(self, *paths):
+ self._paths = [pathlib.Path(p) for p in paths]
+
+ def __repr__(self):
+ return "_AnsibleNSTraversable('%s')" % "', '".join(map(to_text, self._paths))
+
+ def iterdir(self):
+ return itertools.chain.from_iterable(p.iterdir() for p in self._paths if p.is_dir())
+
+ def is_dir(self):
+ return any(p.is_dir() for p in self._paths)
+
+ def is_file(self):
+ return False
+
+ def glob(self, pattern):
+ return itertools.chain.from_iterable(p.glob(pattern) for p in self._paths if p.is_dir())
+
+ def _not_implemented(self, *args, **kwargs):
+ raise NotImplementedError('not usable on namespaces')
+
+ joinpath = __truediv__ = read_bytes = read_text = _not_implemented
+
+
+class _AnsibleTraversableResources(TraversableResources):
+ """Implements ``importlib.resources.abc.TraversableResources`` for the
+ collection Python loaders.
+
+ The result of ``files`` will depend on whether a particular collection, or
+ a sub package of a collection was referenced, as opposed to
+ ``ansible_collections`` or a particular namespace. For a collection and
+ its subpackages, a ``pathlib.Path`` instance will be returned, whereas
+ for the higher level namespace packages, ``_AnsibleNSTraversable``
+ will be returned.
+ """
+ def __init__(self, package, loader):
+ self._package = package
+ self._loader = loader
+
+ def _get_name(self, package):
+ try:
+ # spec
+ return package.name
+ except AttributeError:
+ # module
+ return package.__name__
+
+ def _get_package(self, package):
+ try:
+ # spec
+ return package.__parent__
+ except AttributeError:
+ # module
+ return package.__package__
+
+ def _get_path(self, package):
+ try:
+ # spec
+ return package.origin
+ except AttributeError:
+ # module
+ return package.__file__
+
+ def _is_ansible_ns_package(self, package):
+ origin = getattr(package, 'origin', None)
+ if not origin:
+ return False
+
+ if origin == SYNTHETIC_PACKAGE_NAME:
+ return True
+
+ module_filename = os.path.basename(origin)
+ return module_filename in {'__synthetic__', '__init__.py'}
+
+ def _ensure_package(self, package):
+ if self._is_ansible_ns_package(package):
+ # Short circuit our loaders
+ return
+ if self._get_package(package) != package.__name__:
+ raise TypeError('%r is not a package' % package.__name__)
+
+ def files(self):
+ package = self._package
+ parts = package.split('.')
+ is_ns = parts[0] == 'ansible_collections' and len(parts) < 3
+
+ if isinstance(package, string_types):
+ if is_ns:
+ # Don't use ``spec_from_loader`` here, because that will point
+ # to exactly 1 location for a namespace. Use ``find_spec``
+ # to get a list of all locations for the namespace
+ package = find_spec(package)
+ else:
+ package = spec_from_loader(package, self._loader)
+ elif not isinstance(package, ModuleType):
+ raise TypeError('Expected string or module, got %r' % package.__class__.__name__)
+
+ self._ensure_package(package)
+ if is_ns:
+ return _AnsibleNSTraversable(*package.submodule_search_locations)
+ return pathlib.Path(self._get_path(package)).parent
class _AnsibleCollectionFinder:
@@ -423,6 +580,9 @@ class _AnsibleCollectionPkgLoaderBase:
return module_path, has_code, package_path
+ def get_resource_reader(self, fullname):
+ return _AnsibleTraversableResources(fullname, self)
+
def exec_module(self, module):
# short-circuit redirect; avoid reinitializing existing modules
if self._redirect_module:
@@ -509,7 +669,7 @@ class _AnsibleCollectionPkgLoaderBase:
return None
def _synthetic_filename(self, fullname):
- return '<ansible_synthetic_collection_package>'
+ return SYNTHETIC_PACKAGE_NAME
def get_filename(self, fullname):
if fullname != self._fullname:
@@ -748,6 +908,9 @@ class _AnsibleInternalRedirectLoader:
if not self._redirect:
raise ImportError('not redirected, go ask path_hook')
+ def get_resource_reader(self, fullname):
+ return _AnsibleTraversableResources(fullname, self)
+
def exec_module(self, module):
# should never see this
if not self._redirect:
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py
index 7d98ad4..3f331ad 100644
--- a/lib/ansible/utils/display.py
+++ b/lib/ansible/utils/display.py
@@ -15,34 +15,49 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
+from __future__ import annotations
+
+try:
+ import curses
+except ImportError:
+ HAS_CURSES = False
+else:
+ # this will be set to False if curses.setupterm() fails
+ HAS_CURSES = True
+
+import collections.abc as c
+import codecs
import ctypes.util
import fcntl
import getpass
+import io
import logging
import os
import random
import subprocess
import sys
+import termios
import textwrap
import threading
import time
+import tty
+import typing as t
+from functools import wraps
from struct import unpack, pack
-from termios import TIOCGWINSZ
from ansible import constants as C
-from ansible.errors import AnsibleError, AnsibleAssertionError
-from ansible.module_utils._text import to_bytes, to_text
+from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive
+from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.six import text_type
from ansible.utils.color import stringc
from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible.utils.singleton import Singleton
from ansible.utils.unsafe_proxy import wrap_var
-from functools import wraps
+if t.TYPE_CHECKING:
+ # avoid circular import at runtime
+ from ansible.executor.task_queue_manager import FinalQueue
_LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
# Set argtypes, to avoid segfault if the wrong type is provided,
@@ -52,8 +67,11 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int)
# Max for c_int
_MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1
+MOVE_TO_BOL = b'\r'
+CLEAR_TO_EOL = b'\x1b[K'
+
-def get_text_width(text):
+def get_text_width(text: str) -> int:
"""Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
number of columns used to display a text string.
@@ -104,6 +122,20 @@ def get_text_width(text):
return width if width >= 0 else 0
+def proxy_display(method):
+
+ def proxyit(self, *args, **kwargs):
+ if self._final_q:
+ # If _final_q is set, that means we are in a WorkerProcess
+ # and instead of displaying messages directly from the fork
+ # we will proxy them through the queue
+ return self._final_q.send_display(method.__name__, *args, **kwargs)
+ else:
+ return method(self, *args, **kwargs)
+
+ return proxyit
+
+
class FilterBlackList(logging.Filter):
def __init__(self, blacklist):
self.blacklist = [logging.Filter(name) for name in blacklist]
@@ -164,7 +196,7 @@ b_COW_PATHS = (
)
-def _synchronize_textiowrapper(tio, lock):
+def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock):
# Ensure that a background thread can't hold the internal buffer lock on a file object
# during a fork, which causes forked children to hang. We're using display's existing lock for
# convenience (and entering the lock before a fork).
@@ -179,15 +211,70 @@ def _synchronize_textiowrapper(tio, lock):
buffer = tio.buffer
# monkeypatching the underlying file-like object isn't great, but likely safer than subclassing
- buffer.write = _wrap_with_lock(buffer.write, lock)
- buffer.flush = _wrap_with_lock(buffer.flush, lock)
+ buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign]
+ buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign]
+
+
+def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None:
+ """Put terminal into a raw mode.
+
+ Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG
+
+ OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display
+ is proxied via the queue from forks. The problem is a race condition, in that we proxy the display
+ over the fork, but before it can be displayed, this plugin will have continued executing, potentially
+ setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF
+ """
+ mode = termios.tcgetattr(fd)
+ mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON)
+ mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST)
+ mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB)
+ mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8
+ mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+ mode[tty.CC][termios.VMIN] = 1
+ mode[tty.CC][termios.VTIME] = 0
+ termios.tcsetattr(fd, when, mode)
+
+
+def clear_line(stdout: t.BinaryIO) -> None:
+ stdout.write(b'\x1b[%s' % MOVE_TO_BOL)
+ stdout.write(b'\x1b[%s' % CLEAR_TO_EOL)
+
+
+def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None:
+ setraw(stdin_fd)
+
+ # Only set stdout to raw mode if it is a TTY. This is needed when redirecting
+ # stdout to a file since a file cannot be set to raw mode.
+ if os.isatty(stdout_fd):
+ setraw(stdout_fd)
+
+ if echo:
+ new_settings = termios.tcgetattr(stdin_fd)
+ new_settings[3] = new_settings[3] | termios.ECHO
+ termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings)
+
+
+def setupterm() -> None:
+ # Nest the try except since curses.error is not available if curses did not import
+ try:
+ curses.setupterm()
+ except (curses.error, TypeError, io.UnsupportedOperation):
+ global HAS_CURSES
+ HAS_CURSES = False
+ else:
+ global MOVE_TO_BOL
+ global CLEAR_TO_EOL
+ # curses.tigetstr() returns None in some circumstances
+ MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL
+ CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL
class Display(metaclass=Singleton):
- def __init__(self, verbosity=0):
+ def __init__(self, verbosity: int = 0) -> None:
- self._final_q = None
+ self._final_q: FinalQueue | None = None
# NB: this lock is used to both prevent intermingled output between threads and to block writes during forks.
# Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock).
@@ -197,11 +284,11 @@ class Display(metaclass=Singleton):
self.verbosity = verbosity
# list of all deprecation messages to prevent duplicate display
- self._deprecations = {}
- self._warns = {}
- self._errors = {}
+ self._deprecations: dict[str, int] = {}
+ self._warns: dict[str, int] = {}
+ self._errors: dict[str, int] = {}
- self.b_cowsay = None
+ self.b_cowsay: bytes | None = None
self.noncow = C.ANSIBLE_COW_SELECTION
self.set_cowsay_info()
@@ -212,12 +299,12 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
if cmd.returncode:
raise Exception
- self.cows_available = {to_text(c) for c in out.split()} # set comprehension
+ self.cows_available: set[str] = {to_text(c) for c in out.split()}
if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST):
self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available)
except Exception:
# could not execute cowsay for some reason
- self.b_cowsay = False
+ self.b_cowsay = None
self._set_column_width()
@@ -228,13 +315,25 @@ class Display(metaclass=Singleton):
except Exception as ex:
self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}")
+ codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler)
try:
- sys.stdout.reconfigure(errors='replace')
- sys.stderr.reconfigure(errors='replace')
+ sys.stdout.reconfigure(errors='_replacing_warning_handler')
+ sys.stderr.reconfigure(errors='_replacing_warning_handler')
except Exception as ex:
- self.warning(f"failed to reconfigure stdout/stderr with the replace error handler: {ex}")
+ self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}")
- def set_queue(self, queue):
+ self.setup_curses = False
+
+ def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]:
+ # TODO: This should probably be deferred until after the current display is completed
+ # this will require some amount of new functionality
+ self.deprecated(
+ 'Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr, this is temporary and will become an error',
+ version='2.18',
+ )
+ return '?', exception.end
+
+ def set_queue(self, queue: FinalQueue) -> None:
"""Set the _final_q on Display, so that we know to proxy display over the queue
instead of directly writing to stdout/stderr from forks
@@ -244,7 +343,7 @@ class Display(metaclass=Singleton):
raise RuntimeError('queue cannot be set in parent process')
self._final_q = queue
- def set_cowsay_info(self):
+ def set_cowsay_info(self) -> None:
if C.ANSIBLE_NOCOWS:
return
@@ -255,18 +354,23 @@ class Display(metaclass=Singleton):
if os.path.exists(b_cow_path):
self.b_cowsay = b_cow_path
- def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True):
+ @proxy_display
+ def display(
+ self,
+ msg: str,
+ color: str | None = None,
+ stderr: bool = False,
+ screen_only: bool = False,
+ log_only: bool = False,
+ newline: bool = True,
+ ) -> None:
""" Display a message to the user
Note: msg *must* be a unicode string to prevent UnicodeError tracebacks.
"""
- if self._final_q:
- # If _final_q is set, that means we are in a WorkerProcess
- # and instead of displaying messages directly from the fork
- # we will proxy them through the queue
- return self._final_q.send_display(msg, color=color, stderr=stderr,
- screen_only=screen_only, log_only=log_only, newline=newline)
+ if not isinstance(msg, str):
+ raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}')
nocolor = msg
@@ -321,32 +425,32 @@ class Display(metaclass=Singleton):
# actually log
logger.log(lvl, msg2)
- def v(self, msg, host=None):
+ def v(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=0)
- def vv(self, msg, host=None):
+ def vv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=1)
- def vvv(self, msg, host=None):
+ def vvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=2)
- def vvvv(self, msg, host=None):
+ def vvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=3)
- def vvvvv(self, msg, host=None):
+ def vvvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=4)
- def vvvvvv(self, msg, host=None):
+ def vvvvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=5)
- def debug(self, msg, host=None):
+ def debug(self, msg: str, host: str | None = None) -> None:
if C.DEFAULT_DEBUG:
if host is None:
self.display("%6d %0.5f: %s" % (os.getpid(), time.time(), msg), color=C.COLOR_DEBUG)
else:
self.display("%6d %0.5f [%s]: %s" % (os.getpid(), time.time(), host, msg), color=C.COLOR_DEBUG)
- def verbose(self, msg, host=None, caplevel=2):
+ def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
to_stderr = C.VERBOSE_TO_STDERR
if self.verbosity > caplevel:
@@ -355,7 +459,14 @@ class Display(metaclass=Singleton):
else:
self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr)
- def get_deprecation_message(self, msg, version=None, removed=False, date=None, collection_name=None):
+ def get_deprecation_message(
+ self,
+ msg: str,
+ version: str | None = None,
+ removed: bool = False,
+ date: str | None = None,
+ collection_name: str | None = None,
+ ) -> str:
''' used to print out a deprecation message.'''
msg = msg.strip()
if msg and msg[-1] not in ['!', '?', '.']:
@@ -390,7 +501,15 @@ class Display(metaclass=Singleton):
return message_text
- def deprecated(self, msg, version=None, removed=False, date=None, collection_name=None):
+ @proxy_display
+ def deprecated(
+ self,
+ msg: str,
+ version: str | None = None,
+ removed: bool = False,
+ date: str | None = None,
+ collection_name: str | None = None,
+ ) -> None:
if not removed and not C.DEPRECATION_WARNINGS:
return
@@ -406,7 +525,8 @@ class Display(metaclass=Singleton):
self.display(message_text.strip(), color=C.COLOR_DEPRECATE, stderr=True)
self._deprecations[message_text] = 1
- def warning(self, msg, formatted=False):
+ @proxy_display
+ def warning(self, msg: str, formatted: bool = False) -> None:
if not formatted:
new_msg = "[WARNING]: %s" % msg
@@ -419,11 +539,11 @@ class Display(metaclass=Singleton):
self.display(new_msg, color=C.COLOR_WARN, stderr=True)
self._warns[new_msg] = 1
- def system_warning(self, msg):
+ def system_warning(self, msg: str) -> None:
if C.SYSTEM_WARNINGS:
self.warning(msg)
- def banner(self, msg, color=None, cows=True):
+ def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None:
'''
Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum)
'''
@@ -446,7 +566,7 @@ class Display(metaclass=Singleton):
stars = u"*" * star_len
self.display(u"\n%s %s" % (msg, stars), color=color)
- def banner_cowsay(self, msg, color=None):
+ def banner_cowsay(self, msg: str, color: str | None = None) -> None:
if u": [" in msg:
msg = msg.replace(u"[", u"")
if msg.endswith(u"]"):
@@ -463,7 +583,7 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
self.display(u"%s\n" % to_text(out), color=color)
- def error(self, msg, wrap_text=True):
+ def error(self, msg: str, wrap_text: bool = True) -> None:
if wrap_text:
new_msg = u"\n[ERROR]: %s" % msg
wrapped = textwrap.wrap(new_msg, self.columns)
@@ -475,14 +595,24 @@ class Display(metaclass=Singleton):
self._errors[new_msg] = 1
@staticmethod
- def prompt(msg, private=False):
+ def prompt(msg: str, private: bool = False) -> str:
if private:
return getpass.getpass(msg)
else:
return input(msg)
- def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None, unsafe=None):
-
+ def do_var_prompt(
+ self,
+ varname: str,
+ private: bool = True,
+ prompt: str | None = None,
+ encrypt: str | None = None,
+ confirm: bool = False,
+ salt_size: int | None = None,
+ salt: str | None = None,
+ default: str | None = None,
+ unsafe: bool = False,
+ ) -> str:
result = None
if sys.__stdin__.isatty():
@@ -515,7 +645,7 @@ class Display(metaclass=Singleton):
if encrypt:
# Circular import because encrypt needs a display class
from ansible.utils.encrypt import do_encrypt
- result = do_encrypt(result, encrypt, salt_size, salt)
+ result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt)
# handle utf-8 chars
result = to_text(result, errors='surrogate_or_strict')
@@ -524,9 +654,149 @@ class Display(metaclass=Singleton):
result = wrap_var(result)
return result
- def _set_column_width(self):
+ def _set_column_width(self) -> None:
if os.isatty(1):
- tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
+ tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
else:
tty_size = 0
self.columns = max(79, tty_size - 1)
+
+ def prompt_until(
+ self,
+ msg: str,
+ private: bool = False,
+ seconds: int | None = None,
+ interrupt_input: c.Container[bytes] | None = None,
+ complete_input: c.Container[bytes] | None = None,
+ ) -> bytes:
+ if self._final_q:
+ from ansible.executor.process.worker import current_worker
+ self._final_q.send_prompt(
+ worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds,
+ interrupt_input=interrupt_input, complete_input=complete_input
+ )
+ return current_worker.worker_queue.get()
+
+ if HAS_CURSES and not self.setup_curses:
+ setupterm()
+ self.setup_curses = True
+
+ if (
+ self._stdin_fd is None
+ or not os.isatty(self._stdin_fd)
+ # Compare the current process group to the process group associated
+ # with terminal of the given file descriptor to determine if the process
+ # is running in the background.
+ or os.getpgrp() != os.tcgetpgrp(self._stdin_fd)
+ ):
+ raise AnsiblePromptNoninteractive('stdin is not interactive')
+
+ # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass,
+ # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread.
+ # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids.
+ # if seconds is None and interrupt_input is None and complete_input is None:
+ # try:
+ # return self.prompt(msg, private=private)
+ # except KeyboardInterrupt:
+ # # can't catch in the results_thread_main daemon thread
+ # raise AnsiblePromptInterrupt('user interrupt')
+
+ self.display(msg)
+ result = b''
+ with self._lock:
+ original_stdin_settings = termios.tcgetattr(self._stdin_fd)
+ try:
+ setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private)
+
+ # flush the buffer to make sure no previous key presses
+ # are read in below
+ termios.tcflush(self._stdin, termios.TCIFLUSH)
+
+ # read input 1 char at a time until the optional timeout or complete/interrupt condition is met
+ return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input)
+ finally:
+ # restore the old settings for the duped stdin stdin_fd
+ termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings)
+
+ def _read_non_blocking_stdin(
+ self,
+ echo: bool = False,
+ seconds: int | None = None,
+ interrupt_input: c.Container[bytes] | None = None,
+ complete_input: c.Container[bytes] | None = None,
+ ) -> bytes:
+ if self._final_q:
+ raise NotImplementedError
+
+ if seconds is not None:
+ start = time.time()
+ if interrupt_input is None:
+ try:
+ interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR]
+ except Exception:
+ interrupt = b'\x03' # value for Ctrl+C
+
+ try:
+ backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]]
+ except Exception:
+ # unsupported/not present, use default
+ backspace_sequences = [b'\x7f', b'\x08']
+
+ result_string = b''
+ while seconds is None or (time.time() - start < seconds):
+ key_pressed = None
+ try:
+ os.set_blocking(self._stdin_fd, False)
+ while key_pressed is None and (seconds is None or (time.time() - start < seconds)):
+ key_pressed = self._stdin.read(1)
+ # throttle to prevent excess CPU consumption
+ time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
+ finally:
+ os.set_blocking(self._stdin_fd, True)
+ if key_pressed is None:
+ key_pressed = b''
+
+ if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input):
+ clear_line(self._stdout)
+ raise AnsiblePromptInterrupt('user interrupt')
+ if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input):
+ clear_line(self._stdout)
+ break
+ elif key_pressed in backspace_sequences:
+ clear_line(self._stdout)
+ result_string = result_string[:-1]
+ if echo:
+ self._stdout.write(result_string)
+ self._stdout.flush()
+ else:
+ result_string += key_pressed
+ return result_string
+
+ @property
+ def _stdin(self) -> t.BinaryIO | None:
+ if self._final_q:
+ raise NotImplementedError
+ try:
+ return sys.stdin.buffer
+ except AttributeError:
+ return None
+
+ @property
+ def _stdin_fd(self) -> int | None:
+ try:
+ return self._stdin.fileno()
+ except (ValueError, AttributeError):
+ return None
+
+ @property
+ def _stdout(self) -> t.BinaryIO:
+ if self._final_q:
+ raise NotImplementedError
+ return sys.stdout.buffer
+
+ @property
+ def _stdout_fd(self) -> int | None:
+ try:
+ return self._stdout.fileno()
+ except (ValueError, AttributeError):
+ return None
diff --git a/lib/ansible/utils/encrypt.py b/lib/ansible/utils/encrypt.py
index 661fde3..541c5c8 100644
--- a/lib/ansible/utils/encrypt.py
+++ b/lib/ansible/utils/encrypt.py
@@ -4,7 +4,6 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
-import multiprocessing
import random
import re
import string
@@ -15,7 +14,7 @@ from collections import namedtuple
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleAssertionError
from ansible.module_utils.six import text_type
-from ansible.module_utils._text import to_text, to_bytes
+from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.utils.display import Display
PASSLIB_E = CRYPT_E = None
@@ -43,8 +42,6 @@ display = Display()
__all__ = ['do_encrypt']
-_LOCK = multiprocessing.Lock()
-
DEFAULT_PASSWORD_LENGTH = 20
@@ -105,7 +102,7 @@ class CryptHash(BaseHash):
"Python crypt module is deprecated and will be removed from "
"Python 3.13. Install the passlib library for continued "
"encryption functionality.",
- version=2.17
+ version="2.17",
)
self.algo_data = self.algorithms[algorithm]
@@ -128,7 +125,10 @@ class CryptHash(BaseHash):
return ret
def _rounds(self, rounds):
- if rounds == self.algo_data.implicit_rounds:
+ if self.algorithm == 'bcrypt':
+ # crypt requires 2 digits for rounds
+ return rounds or self.algo_data.implicit_rounds
+ elif rounds == self.algo_data.implicit_rounds:
# Passlib does not include the rounds if it is the same as implicit_rounds.
# Make crypt lib behave the same, by not explicitly specifying the rounds in that case.
return None
@@ -148,12 +148,14 @@ class CryptHash(BaseHash):
saltstring = "$%s" % ident
if rounds:
- saltstring += "$rounds=%d" % rounds
+ if self.algorithm == 'bcrypt':
+ saltstring += "$%d" % rounds
+ else:
+ saltstring += "$rounds=%d" % rounds
saltstring += "$%s" % salt
- # crypt.crypt on Python < 3.9 returns None if it cannot parse saltstring
- # On Python >= 3.9, it throws OSError.
+ # crypt.crypt throws OSError on Python >= 3.9 if it cannot parse saltstring.
try:
result = crypt.crypt(secret, saltstring)
orig_exc = None
@@ -161,7 +163,7 @@ class CryptHash(BaseHash):
result = None
orig_exc = e
- # None as result would be interpreted by the some modules (user module)
+ # None as result would be interpreted by some modules (user module)
# as no password at all.
if not result:
raise AnsibleError(
@@ -178,6 +180,7 @@ class PasslibHash(BaseHash):
if not PASSLIB_AVAILABLE:
raise AnsibleError("passlib must be installed and usable to hash with '%s'" % algorithm, orig_exc=PASSLIB_E)
+ display.vv("Using passlib to hash input with '%s'" % algorithm)
try:
self.crypt_algo = getattr(passlib.hash, algorithm)
@@ -264,12 +267,13 @@ class PasslibHash(BaseHash):
def passlib_or_crypt(secret, algorithm, salt=None, salt_size=None, rounds=None, ident=None):
+ display.deprecated("passlib_or_crypt API is deprecated in favor of do_encrypt", version='2.20')
+ return do_encrypt(secret, algorithm, salt=salt, salt_size=salt_size, rounds=rounds, ident=ident)
+
+
+def do_encrypt(result, encrypt, salt_size=None, salt=None, ident=None, rounds=None):
if PASSLIB_AVAILABLE:
- return PasslibHash(algorithm).hash(secret, salt=salt, salt_size=salt_size, rounds=rounds, ident=ident)
+ return PasslibHash(encrypt).hash(result, salt=salt, salt_size=salt_size, rounds=rounds, ident=ident)
if HAS_CRYPT:
- return CryptHash(algorithm).hash(secret, salt=salt, salt_size=salt_size, rounds=rounds, ident=ident)
+ return CryptHash(encrypt).hash(result, salt=salt, salt_size=salt_size, rounds=rounds, ident=ident)
raise AnsibleError("Unable to encrypt nor hash, either crypt or passlib must be installed.", orig_exc=CRYPT_E)
-
-
-def do_encrypt(result, encrypt, salt_size=None, salt=None, ident=None):
- return passlib_or_crypt(result, encrypt, salt_size=salt_size, salt=salt, ident=ident)
diff --git a/lib/ansible/utils/hashing.py b/lib/ansible/utils/hashing.py
index 71300d6..97ea1dc 100644
--- a/lib/ansible/utils/hashing.py
+++ b/lib/ansible/utils/hashing.py
@@ -30,7 +30,7 @@ except ImportError:
_md5 = None
from ansible.errors import AnsibleError
-from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common.text.converters import to_bytes
def secure_hash_s(data, hash_func=sha1):
diff --git a/lib/ansible/utils/jsonrpc.py b/lib/ansible/utils/jsonrpc.py
index 8d5b0f6..2af8bd3 100644
--- a/lib/ansible/utils/jsonrpc.py
+++ b/lib/ansible/utils/jsonrpc.py
@@ -8,7 +8,7 @@ import json
import pickle
import traceback
-from ansible.module_utils._text import to_text
+from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.six import binary_type, text_type
from ansible.utils.display import Display
diff --git a/lib/ansible/utils/path.py b/lib/ansible/utils/path.py
index f876add..e4e00ce 100644
--- a/lib/ansible/utils/path.py
+++ b/lib/ansible/utils/path.py
@@ -22,7 +22,7 @@ import shutil
from errno import EEXIST
from ansible.errors import AnsibleError
-from ansible.module_utils._text import to_bytes, to_native, to_text
+from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
__all__ = ['unfrackpath', 'makedirs_safe']
diff --git a/lib/ansible/utils/plugin_docs.py b/lib/ansible/utils/plugin_docs.py
index 3af2678..91b3722 100644
--- a/lib/ansible/utils/plugin_docs.py
+++ b/lib/ansible/utils/plugin_docs.py
@@ -11,7 +11,7 @@ from ansible import constants as C
from ansible.release import __version__ as ansible_version
from ansible.errors import AnsibleError, AnsibleParserError, AnsiblePluginNotFound
from ansible.module_utils.six import string_types
-from ansible.module_utils._text import to_native
+from ansible.module_utils.common.text.converters import to_native
from ansible.parsing.plugin_docs import read_docstring
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.utils.display import Display
diff --git a/lib/ansible/utils/py3compat.py b/lib/ansible/utils/py3compat.py
index 88d9fdf..5201132 100644
--- a/lib/ansible/utils/py3compat.py
+++ b/lib/ansible/utils/py3compat.py
@@ -17,7 +17,7 @@ import sys
from collections.abc import MutableMapping
from ansible.module_utils.six import PY3
-from ansible.module_utils._text import to_bytes, to_text
+from ansible.module_utils.common.text.converters import to_bytes, to_text
__all__ = ('environ',)
diff --git a/lib/ansible/utils/shlex.py b/lib/ansible/utils/shlex.py
index 5e82021..8f50ffd 100644
--- a/lib/ansible/utils/shlex.py
+++ b/lib/ansible/utils/shlex.py
@@ -20,15 +20,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import shlex
-from ansible.module_utils.six import PY3
-from ansible.module_utils._text import to_bytes, to_text
-if PY3:
- # shlex.split() wants Unicode (i.e. ``str``) input on Python 3
- shlex_split = shlex.split
-else:
- # shlex.split() wants bytes (i.e. ``str``) input on Python 2
- def shlex_split(s, comments=False, posix=True):
- return map(to_text, shlex.split(to_bytes(s), comments, posix))
- shlex_split.__doc__ = shlex.split.__doc__
+# shlex.split() wants Unicode (i.e. ``str``) input on Python 3
+shlex_split = shlex.split
diff --git a/lib/ansible/utils/ssh_functions.py b/lib/ansible/utils/ssh_functions.py
index a728889..594dbc0 100644
--- a/lib/ansible/utils/ssh_functions.py
+++ b/lib/ansible/utils/ssh_functions.py
@@ -23,8 +23,11 @@ __metaclass__ = type
import subprocess
from ansible import constants as C
-from ansible.module_utils._text import to_bytes
+from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.compat.paramiko import paramiko
+from ansible.utils.display import Display
+
+display = Display()
_HAS_CONTROLPERSIST = {} # type: dict[str, bool]
@@ -51,13 +54,11 @@ def check_for_controlpersist(ssh_executable):
return has_cp
-# TODO: move to 'smart' connection plugin that subclasses to ssh/paramiko as needed.
def set_default_transport():
# deal with 'smart' connection .. one time ..
if C.DEFAULT_TRANSPORT == 'smart':
- # TODO: check if we can deprecate this as ssh w/o control persist should
- # not be as common anymore.
+ display.deprecated("The 'smart' option for connections is deprecated. Set the connection plugin directly instead.", version='2.20')
# see if SSH can support ControlPersist if not use paramiko
if not check_for_controlpersist('ssh') and paramiko is not None:
diff --git a/lib/ansible/utils/unicode.py b/lib/ansible/utils/unicode.py
index 1218a6e..b5304ba 100644
--- a/lib/ansible/utils/unicode.py
+++ b/lib/ansible/utils/unicode.py
@@ -19,7 +19,7 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
-from ansible.module_utils._text import to_text
+from ansible.module_utils.common.text.converters import to_text
__all__ = ('unicode_wrap',)
diff --git a/lib/ansible/utils/unsafe_proxy.py b/lib/ansible/utils/unsafe_proxy.py
index 683f6e2..b3e7383 100644
--- a/lib/ansible/utils/unsafe_proxy.py
+++ b/lib/ansible/utils/unsafe_proxy.py
@@ -53,9 +53,13 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
+import sys
+import types
+import warnings
+from sys import intern as _sys_intern
from collections.abc import Mapping, Set
-from ansible.module_utils._text import to_bytes, to_text
+from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.common.collections import is_sequence
from ansible.utils.native_jinja import NativeJinjaText
@@ -369,3 +373,20 @@ def to_unsafe_text(*args, **kwargs):
def _is_unsafe(obj):
return getattr(obj, '__UNSAFE__', False) is True
+
+
+def _intern(string):
+ """This is a monkey patch for ``sys.intern`` that will strip
+ the unsafe wrapper prior to interning the string.
+
+ This will not exist in future versions.
+ """
+ if isinstance(string, AnsibleUnsafeText):
+ string = string._strip_unsafe()
+ return _sys_intern(string)
+
+
+if isinstance(sys.intern, types.BuiltinFunctionType):
+ sys.intern = _intern
+else:
+ warnings.warn("skipped sys.intern patch; appears to have already been patched", RuntimeWarning)
diff --git a/lib/ansible/utils/vars.py b/lib/ansible/utils/vars.py
index a3224c8..5e21cb3 100644
--- a/lib/ansible/utils/vars.py
+++ b/lib/ansible/utils/vars.py
@@ -29,8 +29,8 @@ from json import dumps
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError, AnsibleOptionsError
-from ansible.module_utils.six import string_types, PY3
-from ansible.module_utils._text import to_native, to_text
+from ansible.module_utils.six import string_types
+from ansible.module_utils.common.text.converters import to_native, to_text
from ansible.parsing.splitter import parse_kv
@@ -109,6 +109,8 @@ def merge_hash(x, y, recursive=True, list_merge='replace'):
# except performance)
if x == {} or x == y:
return y.copy()
+ if y == {}:
+ return x
# in the following we will copy elements from y to x, but
# we don't want to modify x, so we create a copy of it
@@ -181,66 +183,67 @@ def merge_hash(x, y, recursive=True, list_merge='replace'):
def load_extra_vars(loader):
- extra_vars = {}
- for extra_vars_opt in context.CLIARGS.get('extra_vars', tuple()):
- data = None
- extra_vars_opt = to_text(extra_vars_opt, errors='surrogate_or_strict')
- if extra_vars_opt is None or not extra_vars_opt:
- continue
- if extra_vars_opt.startswith(u"@"):
- # Argument is a YAML file (JSON is a subset of YAML)
- data = loader.load_from_file(extra_vars_opt[1:])
- elif extra_vars_opt[0] in [u'/', u'.']:
- raise AnsibleOptionsError("Please prepend extra_vars filename '%s' with '@'" % extra_vars_opt)
- elif extra_vars_opt[0] in [u'[', u'{']:
- # Arguments as YAML
- data = loader.load(extra_vars_opt)
- else:
- # Arguments as Key-value
- data = parse_kv(extra_vars_opt)
+ if not getattr(load_extra_vars, 'extra_vars', None):
+ extra_vars = {}
+ for extra_vars_opt in context.CLIARGS.get('extra_vars', tuple()):
+ data = None
+ extra_vars_opt = to_text(extra_vars_opt, errors='surrogate_or_strict')
+ if extra_vars_opt is None or not extra_vars_opt:
+ continue
+
+ if extra_vars_opt.startswith(u"@"):
+ # Argument is a YAML file (JSON is a subset of YAML)
+ data = loader.load_from_file(extra_vars_opt[1:])
+ elif extra_vars_opt[0] in [u'/', u'.']:
+ raise AnsibleOptionsError("Please prepend extra_vars filename '%s' with '@'" % extra_vars_opt)
+ elif extra_vars_opt[0] in [u'[', u'{']:
+ # Arguments as YAML
+ data = loader.load(extra_vars_opt)
+ else:
+ # Arguments as Key-value
+ data = parse_kv(extra_vars_opt)
+
+ if isinstance(data, MutableMapping):
+ extra_vars = combine_vars(extra_vars, data)
+ else:
+ raise AnsibleOptionsError("Invalid extra vars data supplied. '%s' could not be made into a dictionary" % extra_vars_opt)
- if isinstance(data, MutableMapping):
- extra_vars = combine_vars(extra_vars, data)
- else:
- raise AnsibleOptionsError("Invalid extra vars data supplied. '%s' could not be made into a dictionary" % extra_vars_opt)
+ setattr(load_extra_vars, 'extra_vars', extra_vars)
- return extra_vars
+ return load_extra_vars.extra_vars
def load_options_vars(version):
- if version is None:
- version = 'Unknown'
- options_vars = {'ansible_version': version}
- attrs = {'check': 'check_mode',
- 'diff': 'diff_mode',
- 'forks': 'forks',
- 'inventory': 'inventory_sources',
- 'skip_tags': 'skip_tags',
- 'subset': 'limit',
- 'tags': 'run_tags',
- 'verbosity': 'verbosity'}
+ if not getattr(load_options_vars, 'options_vars', None):
+ if version is None:
+ version = 'Unknown'
+ options_vars = {'ansible_version': version}
+ attrs = {'check': 'check_mode',
+ 'diff': 'diff_mode',
+ 'forks': 'forks',
+ 'inventory': 'inventory_sources',
+ 'skip_tags': 'skip_tags',
+ 'subset': 'limit',
+ 'tags': 'run_tags',
+ 'verbosity': 'verbosity'}
- for attr, alias in attrs.items():
- opt = context.CLIARGS.get(attr)
- if opt is not None:
- options_vars['ansible_%s' % alias] = opt
+ for attr, alias in attrs.items():
+ opt = context.CLIARGS.get(attr)
+ if opt is not None:
+ options_vars['ansible_%s' % alias] = opt
- return options_vars
+ setattr(load_options_vars, 'options_vars', options_vars)
+
+ return load_options_vars.options_vars
def _isidentifier_PY3(ident):
if not isinstance(ident, string_types):
return False
- # NOTE Python 3.7 offers str.isascii() so switch over to using it once
- # we stop supporting 3.5 and 3.6 on the controller
- try:
- # Python 2 does not allow non-ascii characters in identifiers so unify
- # the behavior for Python 3
- ident.encode('ascii')
- except UnicodeEncodeError:
+ if not ident.isascii():
return False
if not ident.isidentifier():
@@ -252,26 +255,7 @@ def _isidentifier_PY3(ident):
return True
-def _isidentifier_PY2(ident):
- if not isinstance(ident, string_types):
- return False
-
- if not ident:
- return False
-
- if C.INVALID_VARIABLE_NAMES.search(ident):
- return False
-
- if keyword.iskeyword(ident) or ident in ADDITIONAL_PY2_KEYWORDS:
- return False
-
- return True
-
-
-if PY3:
- isidentifier = _isidentifier_PY3
-else:
- isidentifier = _isidentifier_PY2
+isidentifier = _isidentifier_PY3
isidentifier.__doc__ = """Determine if string is valid identifier.
diff --git a/lib/ansible/utils/version.py b/lib/ansible/utils/version.py
index c045e7d..e7da9fd 100644
--- a/lib/ansible/utils/version.py
+++ b/lib/ansible/utils/version.py
@@ -9,8 +9,6 @@ import re
from ansible.module_utils.compat.version import LooseVersion, Version
-from ansible.module_utils.six import text_type
-
# Regular expression taken from
# https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string