diff options
Diffstat (limited to 'ansible_collections/cloud/common/plugins/module_utils')
5 files changed, 755 insertions, 0 deletions
diff --git a/ansible_collections/cloud/common/plugins/module_utils/turbo/common.py b/ansible_collections/cloud/common/plugins/module_utils/turbo/common.py new file mode 100644 index 00000000..e5ad1938 --- /dev/null +++ b/ansible_collections/cloud/common/plugins/module_utils/turbo/common.py @@ -0,0 +1,125 @@ +# Copyright (c) 2021 Red Hat +# +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +import os +import socket +import sys +import time +import subprocess +import pickle +from contextlib import contextmanager +import json + +from .exceptions import ( + EmbeddedModuleUnexpectedFailure, +) + + +class AnsibleTurboSocket: + def __init__(self, socket_path, ttl=None, plugin="module"): + self._socket_path = socket_path + self._ttl = ttl + self._plugin = plugin + self._socket = None + + def bind(self): + running = False + self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + for attempt in range(100, -1, -1): + try: + self._socket.connect(self._socket_path) + return True + except (ConnectionRefusedError, FileNotFoundError): + if not running: + running = self.start_server() + if attempt == 0: + raise + time.sleep(0.01) + + def start_server(self): + env = os.environ + parameters = [ + "--fork", + "--socket-path", + self._socket_path, + ] + + if self._ttl: + parameters += ["--ttl", str(self._ttl)] + + command = [sys.executable] + if self._plugin == "module": + ansiblez_path = sys.path[0] + env.update({"PYTHONPATH": ansiblez_path}) + command += [ + "-m", + "ansible_collections.cloud.common.plugins.module_utils.turbo.server", + ] + else: + parent_dir = os.path.dirname(__file__) + server_path = os.path.join(parent_dir, "server.py") + command += [server_path] + p = subprocess.Popen( + command + parameters, + env=env, + close_fds=True, + ) + p.communicate() + return p.pid + + def communicate(self, data, wait_sleep=0.01): + encoded_data = pickle.dumps((self._plugin, data)) + self._socket.sendall(encoded_data) + self._socket.shutdown(socket.SHUT_WR) + raw_answer = b"" + while True: + b = self._socket.recv((1024 * 1024)) + if not b: + break + raw_answer += b + time.sleep(wait_sleep) + try: + result = json.loads(raw_answer.decode()) + return result + except json.decoder.JSONDecodeError: + raise EmbeddedModuleUnexpectedFailure( + "Cannot decode plugin answer: {0}".format(raw_answer) + ) + + def close(self): + if self._socket: + self._socket.close() + + +@contextmanager +def connect(socket_path, ttl=None, plugin="module"): + turbo_socket = AnsibleTurboSocket(socket_path=socket_path, ttl=ttl, plugin=plugin) + try: + turbo_socket.bind() + yield turbo_socket + finally: + turbo_socket.close() diff --git a/ansible_collections/cloud/common/plugins/module_utils/turbo/exceptions.py b/ansible_collections/cloud/common/plugins/module_utils/turbo/exceptions.py new file mode 100644 index 00000000..acad2cba --- /dev/null +++ b/ansible_collections/cloud/common/plugins/module_utils/turbo/exceptions.py @@ -0,0 +1,65 @@ +# Copyright (c) 2021 Red Hat +# +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + + +class EmbeddedModuleFailure(Exception): + def __init__(self, msg, **kwargs): + self._message = msg + self._kwargs = kwargs + + def get_message(self): + return self._message + + @property + def kwargs(self): + return self._kwargs + + def __repr__(self): + return repr(self.get_message()) + + def __str__(self): + return str(self.get_message()) + + +class EmbeddedModuleUnexpectedFailure(Exception): + def __init__(self, msg): + self._message = msg + + def get_message(self): + return self._message + + def __repr__(self): + return repr(self.get_message()) + + def __str__(self): + return str(self.get_message()) + + +class EmbeddedModuleSuccess(Exception): + def __init__(self, **kwargs): + self.kwargs = kwargs diff --git a/ansible_collections/cloud/common/plugins/module_utils/turbo/module.py b/ansible_collections/cloud/common/plugins/module_utils/turbo/module.py new file mode 100644 index 00000000..c2f9d667 --- /dev/null +++ b/ansible_collections/cloud/common/plugins/module_utils/turbo/module.py @@ -0,0 +1,169 @@ +# Copyright (c) 2021 Red Hat +# +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +import json +import os +import os.path +import sys +import tempfile + +import ansible.module_utils.basic +from .exceptions import ( + EmbeddedModuleSuccess, + EmbeddedModuleFailure, +) +import ansible_collections.cloud.common.plugins.module_utils.turbo.common + +if False: # pylint: disable=using-constant-test + from .server import please_include_me + + # This is a trick to be sure server.py is embedded in the Ansiblez + # zip archive.🥷 + please_include_me + + +def get_collection_name_from_path(): + module_path = ansible.module_utils.basic.get_module_path() + + ansiblez = module_path.split("/")[-3] + if ansiblez.startswith("ansible_") and ansiblez.endswith(".zip"): + return ".".join(ansiblez[8:].split(".")[:2]) + + +def expand_argument_specs_aliases(argument_spec): + """Returns a dict of accepted argument that includes the aliases""" + expanded_argument_specs = {} + for k, v in argument_spec.items(): + for alias in [k] + v.get("aliases", []): + expanded_argument_specs[alias] = v + return expanded_argument_specs + + +def prepare_args(argument_specs, params): + """Take argument_spec and the user params and prepare the final argument structure.""" + + def _keep_value(v, argument_specs, key, subkey=None): + if v is None: # cannot be a valide parameter + return False + if key not in argument_specs: # should never happen + return + if not subkey: # level 1 parameter + return v != argument_specs[key].get("default") + elif subkey not in argument_specs[key]: # Freeform + return True + elif isinstance(argument_specs[key][subkey], dict): + return v != argument_specs[key][subkey].get("default") + else: # should never happen + return True + + def _is_an_alias(k): + aliases = argument_specs[k].get("aliases") + return aliases and k in aliases + + new_params = {} + for k, v in params.items(): + if not _keep_value(v, argument_specs, k): + continue + + if _is_an_alias(k): + continue + + if isinstance(v, dict): + new_params[k] = { + i: j for i, j in v.items() if _keep_value(j, argument_specs, k, i) + } + else: + new_params[k] = v + args = {"ANSIBLE_MODULE_ARGS": new_params} + return args + + +class AnsibleTurboModule(ansible.module_utils.basic.AnsibleModule): + embedded_in_server = False + collection_name = None + + def __init__(self, *args, **kwargs): + self.embedded_in_server = sys.argv[0].endswith("/server.py") + self.collection_name = ( + AnsibleTurboModule.collection_name or get_collection_name_from_path() + ) + ansible.module_utils.basic.AnsibleModule.__init__( + self, *args, bypass_checks=not self.embedded_in_server, **kwargs + ) + self._running = None + if not self.embedded_in_server: + self.run_on_daemon() + + def socket_path(self): + if self._remote_tmp is None: + abs_remote_tmp = tempfile.gettempdir() + else: + abs_remote_tmp = os.path.expanduser(os.path.expandvars(self._remote_tmp)) + return os.path.join(abs_remote_tmp, f"turbo_mode.{self.collection_name}.socket") + + def init_args(self): + argument_specs = expand_argument_specs_aliases(self.argument_spec) + args = prepare_args(argument_specs, self.params) + for k in ansible.module_utils.basic.PASS_VARS: + attribute = ansible.module_utils.basic.PASS_VARS[k][0] + if not hasattr(self, attribute): + continue + v = getattr(self, attribute) + if isinstance(v, int) or isinstance(v, bool) or isinstance(v, str): + args["ANSIBLE_MODULE_ARGS"][f"_ansible_{k}"] = v + return args + + def run_on_daemon(self): + result = dict(changed=False, original_message="", message="") + ttl = os.environ.get("ANSIBLE_TURBO_LOOKUP_TTL", None) + with ansible_collections.cloud.common.plugins.module_utils.turbo.common.connect( + socket_path=self.socket_path(), ttl=ttl + ) as turbo_socket: + ansiblez_path = sys.path[0] + args = self.init_args() + data = [ + ansiblez_path, + json.dumps(args), + dict(os.environ), + ] + content = json.dumps(data).encode() + result = turbo_socket.communicate(content) + self.exit_json(**result) + + def exit_json(self, **kwargs): + if not self.embedded_in_server: + super().exit_json(**kwargs) + else: + self.do_cleanup_files() + raise EmbeddedModuleSuccess(**kwargs) + + def fail_json(self, *args, **kwargs): + if not self.embedded_in_server: + super().fail_json(**kwargs) + else: + self.do_cleanup_files() + raise EmbeddedModuleFailure(*args, **kwargs) diff --git a/ansible_collections/cloud/common/plugins/module_utils/turbo/server.py b/ansible_collections/cloud/common/plugins/module_utils/turbo/server.py new file mode 100644 index 00000000..028110c2 --- /dev/null +++ b/ansible_collections/cloud/common/plugins/module_utils/turbo/server.py @@ -0,0 +1,395 @@ +# Copyright (c) 2021 Red Hat +# +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +import argparse +import asyncio +from datetime import datetime +import importlib + +# py38 only, See: https://github.com/PyCQA/pylint/issues/2976 +import inspect # pylint: disable=syntax-error +import io +import json + +# py38 only, See: https://github.com/PyCQA/pylint/issues/2976 +import collections # pylint: disable=syntax-error +import os +import signal +import sys +import traceback +import zipfile +from zipimport import zipimporter +import pickle +import uuid + +sys_path_lock = None +env_lock = None + +import ansible.module_utils.basic + +please_include_me = "bar" + + +def fork_process(): + """ + This function performs the double fork process to detach from the + parent process and execute. + """ + pid = os.fork() + + if pid == 0: + fd = os.open(os.devnull, os.O_RDWR) + + # clone stdin/out/err + for num in range(3): + if fd != num: + os.dup2(fd, num) + + if fd not in range(3): + os.close(fd) + + pid = os.fork() + if pid > 0: + os._exit(0) + + # get new process session and detach + sid = os.setsid() + if sid == -1: + raise Exception("Unable to detach session while daemonizing") + + # avoid possible problems with cwd being removed + os.chdir("/") + + pid = os.fork() + if pid > 0: + sys.exit(0) # pylint: disable=ansible-bad-function + else: + sys.exit(0) # pylint: disable=ansible-bad-function + return pid + + +class EmbeddedModule: + def __init__(self, ansiblez_path, params): + self.ansiblez_path = ansiblez_path + self.collection_name, self.module_name = self.find_module_name() + self.params = params + self.module_class = None + self.debug_mode = False + self.module_path = ( + "ansible_collections.{collection_name}." "plugins.modules.{module_name}" + ).format(collection_name=self.collection_name, module_name=self.module_name) + + def find_module_name(self): + with zipfile.ZipFile(self.ansiblez_path) as zip: + for path in zip.namelist(): + if not path.startswith("ansible_collections"): + continue + if not path.endswith(".py"): + continue + if path.endswith("__init__.py"): + continue + splitted = path.split("/") + if len(splitted) != 6: + continue + if splitted[-3:-1] != ["plugins", "modules"]: + continue + collection = ".".join(splitted[1:3]) + name = splitted[-1][:-3] + return collection, name + raise Exception("Cannot find module name") + + async def load(self): + async with sys_path_lock: + # Add the Ansiblez_path in sys.path + sys.path.insert(0, self.ansiblez_path) + + # resettle the loaded modules that were associated + # with a different Ansiblez. + for path, module in sorted(tuple(sys.modules.items())): + if path and module and path.startswith("ansible_collections"): + try: + prefix = sys.modules[path].__loader__.prefix + except AttributeError: + # Not from a zipimporter loader, skipping + continue + # Reload package modules only, to pick up new modules from + # packages that have been previously loaded. + if hasattr(sys.modules[path], "__path__"): + py_path = self.ansiblez_path + os.sep + prefix + my_loader = zipimporter(py_path) + sys.modules[path].__loader__ = my_loader + try: + importlib.reload(sys.modules[path]) + except ModuleNotFoundError: + pass + # Finally, load the plugin class. + self.module_class = importlib.import_module(self.module_path) + + async def unload(self): + async with sys_path_lock: + sys.path = [i for i in sys.path if i != self.ansiblez_path] + + def create_profiler(self): + if self.debug_mode: + import cProfile + + pr = cProfile.Profile() + pr.enable() + return pr + + def print_profiling_info(self, pr): + if self.debug_mode: + import pstats + + sortby = pstats.SortKey.CUMULATIVE + ps = pstats.Stats(pr).sort_stats(sortby) + ps.print_stats(20) + + def print_backtrace(self, backtrace): + if self.debug_mode: + print(backtrace) # pylint: disable=ansible-bad-function + + async def run(self): + class FakeStdin: + buffer = None + + from .exceptions import ( + EmbeddedModuleFailure, + EmbeddedModuleUnexpectedFailure, + EmbeddedModuleSuccess, + ) + + # monkeypatching to pass the argument to the module, this is not + # really safe, and in the future, this will prevent us to run several + # modules in parallel. We can maybe use a scoped monkeypatch instead + _fake_stdin = FakeStdin() + _fake_stdin.buffer = io.BytesIO(self.params.encode()) + sys.stdin = _fake_stdin + # Trick to be sure ansible.module_utils.basic._load_params() won't + # try to build the module parameters from the daemon arguments + sys.argv = sys.argv[:1] + ansible.module_utils.basic._ANSIBLE_ARGS = None + pr = self.create_profiler() + if not hasattr(self.module_class, "main"): + raise EmbeddedModuleFailure("No main() found!") + try: + if inspect.iscoroutinefunction(self.module_class.main): + await self.module_class.main() + elif pr: + pr.runcall(self.module_class.main) + else: + self.module_class.main() + except EmbeddedModuleSuccess as e: + self.print_profiling_info(pr) + return e.kwargs + except EmbeddedModuleFailure as e: + backtrace = traceback.format_exc() + self.print_backtrace(backtrace) + raise + except Exception as e: + backtrace = traceback.format_exc() + self.print_backtrace(backtrace) + raise EmbeddedModuleUnexpectedFailure(str(backtrace)) + else: + raise EmbeddedModuleUnexpectedFailure( + "Likely a bug: exit_json() or fail_json() should be called during the module excution" + ) + + +async def run_as_lookup_plugin(data): + errors = None + try: + import ansible.plugins.loader as plugin_loader + from ansible.parsing.dataloader import DataLoader + from ansible.template import Templar + from ansible.module_utils._text import to_native + + ( + lookup_name, + terms, + variables, + kwargs, + ) = data + + # load lookup plugin + templar = Templar(loader=DataLoader(), variables=None) + ansible_collections = "ansible_collections." + if lookup_name.startswith(ansible_collections): + lookup_name = lookup_name.replace(ansible_collections, "", 1) + ansible_plugins_lookup = ".plugins.lookup." + if ansible_plugins_lookup in lookup_name: + lookup_name = lookup_name.replace(ansible_plugins_lookup, ".", 1) + + instance = plugin_loader.lookup_loader.get( + name=lookup_name, loader=templar._loader, templar=templar + ) + + if not hasattr(instance, "_run"): + return [None, "No _run() found"] + if inspect.iscoroutinefunction(instance._run): + result = await instance._run(terms, variables=variables, **kwargs) + else: + result = instance._run(terms, variables=variables, **kwargs) + except Exception as e: + errors = to_native(e) + return [result, errors] + + +async def run_as_module(content, debug_mode): + from ansible_collections.cloud.common.plugins.module_utils.turbo.exceptions import ( + EmbeddedModuleFailure, + ) + + try: + ( + ansiblez_path, + params, + env, + ) = json.loads(content) + if debug_mode: + print( # pylint: disable=ansible-bad-function + f"-----\nrunning {ansiblez_path} with params: ¨{params}¨" + ) + + embedded_module = EmbeddedModule(ansiblez_path, params) + if debug_mode: + embedded_module.debug_mode = True + + await embedded_module.load() + try: + async with env_lock: + os.environ.clear() + os.environ.update(env) + result = await embedded_module.run() + except SystemExit: + backtrace = traceback.format_exc() + result = {"msg": str(backtrace), "failed": True} + except EmbeddedModuleFailure as e: + result = {"msg": str(e), "failed": True} + if e.kwargs: + result.update(e.kwargs) + except Exception as e: + result = { + "msg": traceback.format_stack() + [str(e)], + "failed": True, + } + await embedded_module.unload() + except Exception as e: + result = {"msg": traceback.format_stack() + [str(e)], "failed": True} + return result + + +class AnsibleVMwareTurboMode: + def __init__(self): + self.sessions = collections.defaultdict(dict) + self.socket_path = None + self.ttl = None + self.debug_mode = None + self.jobs_ongoing = {} + + async def ghost_killer(self): + while True: + await asyncio.sleep(self.ttl) + running_jobs = { + job_id: start_date + for job_id, start_date in self.jobs_ongoing.items() + if (datetime.now() - start_date).total_seconds() < 3600 + } + if running_jobs: + continue + self.stop() + + async def handle(self, reader, writer): + self._watcher.cancel() + self._watcher = self.loop.create_task(self.ghost_killer()) + job_id = str(uuid.uuid4()) + self.jobs_ongoing[job_id] = datetime.now() + raw_data = await reader.read() + if not raw_data: + return + + (plugin_type, content) = pickle.loads(raw_data) + + def _terminate(result): + writer.write(json.dumps(result).encode()) + writer.close() + + if plugin_type == "module": + result = await run_as_module(content, debug_mode=self.debug_mode) + elif plugin_type == "lookup": + result = await run_as_lookup_plugin(content) + _terminate(result) + del self.jobs_ongoing[job_id] + + def handle_exception(self, loop, context): + e = context.get("exception") + traceback.print_exception(type(e), e, e.__traceback__) + self.stop() + + def start(self): + self.loop = asyncio.get_event_loop() + self.loop.add_signal_handler(signal.SIGTERM, self.stop) + self.loop.set_exception_handler(self.handle_exception) + self._watcher = self.loop.create_task(self.ghost_killer()) + + import sys + + if sys.hexversion >= 0x30A00B1: + # py3.10 drops the loop argument of create_task. + self.loop.create_task( + asyncio.start_unix_server(self.handle, path=self.socket_path) + ) + else: + self.loop.create_task( + asyncio.start_unix_server( + self.handle, path=self.socket_path, loop=self.loop + ) + ) + self.loop.run_forever() + + def stop(self): + os.unlink(self.socket_path) + self.loop.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Start a background daemon.") + parser.add_argument("--socket-path") + parser.add_argument("--ttl", default=15, type=int) + parser.add_argument("--fork", action="store_true") + + args = parser.parse_args() + if args.fork: + fork_process() + sys_path_lock = asyncio.Lock() + env_lock = asyncio.Lock() + + server = AnsibleVMwareTurboMode() + server.socket_path = args.socket_path + server.ttl = args.ttl + server.debug_mode = not args.fork + server.start() diff --git a/ansible_collections/cloud/common/plugins/module_utils/turbo_demo.py b/ansible_collections/cloud/common/plugins/module_utils/turbo_demo.py new file mode 100644 index 00000000..1a14f075 --- /dev/null +++ b/ansible_collections/cloud/common/plugins/module_utils/turbo_demo.py @@ -0,0 +1 @@ +# This module is part of the test suite to check the import logic of turbo mode |