diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:04:41 +0000 |
commit | 975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch) | |
tree | 89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/community/general/plugins/callback/elastic.py | |
parent | Initial commit. (diff) | |
download | ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip |
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/general/plugins/callback/elastic.py')
-rw-r--r-- | ansible_collections/community/general/plugins/callback/elastic.py | 424 |
1 files changed, 424 insertions, 0 deletions
diff --git a/ansible_collections/community/general/plugins/callback/elastic.py b/ansible_collections/community/general/plugins/callback/elastic.py new file mode 100644 index 000000000..37526c155 --- /dev/null +++ b/ansible_collections/community/general/plugins/callback/elastic.py @@ -0,0 +1,424 @@ +# Copyright (c) 2021, Victor Martinez <VictorMartinezRubio@gmail.com> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + author: Victor Martinez (@v1v) <VictorMartinezRubio@gmail.com> + name: elastic + type: notification + short_description: Create distributed traces for each Ansible task in Elastic APM + version_added: 3.8.0 + description: + - This callback creates distributed traces for each Ansible task in Elastic APM. + - You can configure the plugin with environment variables. + - See U(https://www.elastic.co/guide/en/apm/agent/python/current/configuration.html). + options: + hide_task_arguments: + default: false + type: bool + description: + - Hide the arguments for a task. + env: + - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS + apm_service_name: + default: ansible + type: str + description: + - The service name resource attribute. + env: + - name: ELASTIC_APM_SERVICE_NAME + apm_server_url: + type: str + description: + - Use the APM server and its environment variables. + env: + - name: ELASTIC_APM_SERVER_URL + apm_secret_token: + type: str + description: + - Use the APM server token + env: + - name: ELASTIC_APM_SECRET_TOKEN + apm_api_key: + type: str + description: + - Use the APM API key + env: + - name: ELASTIC_APM_API_KEY + apm_verify_server_cert: + default: true + type: bool + description: + - Verifies the SSL certificate if an HTTPS connection. + env: + - name: ELASTIC_APM_VERIFY_SERVER_CERT + traceparent: + type: str + description: + - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header). + env: + - name: TRACEPARENT + requirements: + - elastic-apm (Python library) +''' + + +EXAMPLES = ''' +examples: | + Enable the plugin in ansible.cfg: + [defaults] + callbacks_enabled = community.general.elastic + + Set the environment variable: + export ELASTIC_APM_SERVER_URL=<your APM server URL)> + export ELASTIC_APM_SERVICE_NAME=your_service_name + export ELASTIC_APM_API_KEY=your_APM_API_KEY +''' + +import getpass +import socket +import time +import uuid + +from collections import OrderedDict +from os.path import basename + +from ansible.errors import AnsibleError, AnsibleRuntimeError +from ansible.module_utils.six import raise_from +from ansible.plugins.callback import CallbackBase + +try: + from elasticapm import Client, capture_span, trace_parent_from_string, instrument, label +except ImportError as imp_exc: + ELASTIC_LIBRARY_IMPORT_ERROR = imp_exc +else: + ELASTIC_LIBRARY_IMPORT_ERROR = None + + +class TaskData: + """ + Data about an individual task. + """ + + def __init__(self, uuid, name, path, play, action, args): + self.uuid = uuid + self.name = name + self.path = path + self.play = play + self.host_data = OrderedDict() + self.start = time.time() + self.action = action + self.args = args + + def add_host(self, host): + if host.uuid in self.host_data: + if host.status == 'included': + # concatenate task include output from multiple items + host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result) + else: + return + + self.host_data[host.uuid] = host + + +class HostData: + """ + Data about an individual host. + """ + + def __init__(self, uuid, name, status, result): + self.uuid = uuid + self.name = name + self.status = status + self.result = result + self.finish = time.time() + + +class ElasticSource(object): + def __init__(self, display): + self.ansible_playbook = "" + self.ansible_version = None + self.session = str(uuid.uuid4()) + self.host = socket.gethostname() + try: + self.ip_address = socket.gethostbyname(socket.gethostname()) + except Exception as e: + self.ip_address = None + self.user = getpass.getuser() + + self._display = display + + def start_task(self, tasks_data, hide_task_arguments, play_name, task): + """ record the start of a task for one or more hosts """ + + uuid = task._uuid + + if uuid in tasks_data: + return + + name = task.get_name().strip() + path = task.get_path() + action = task.action + args = None + + if not task.no_log and not hide_task_arguments: + args = ', '.join(('%s=%s' % a for a in task.args.items())) + + tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args) + + def finish_task(self, tasks_data, status, result): + """ record the results of a task for a single host """ + + task_uuid = result._task._uuid + + if hasattr(result, '_host') and result._host is not None: + host_uuid = result._host._uuid + host_name = result._host.name + else: + host_uuid = 'include' + host_name = 'include' + + task = tasks_data[task_uuid] + + if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'): + self.ansible_version = result._task_fields['args'].get('_ansible_version') + + task.add_host(HostData(host_uuid, host_name, status, result)) + + def generate_distributed_traces(self, tasks_data, status, end_time, traceparent, apm_service_name, + apm_server_url, apm_verify_server_cert, apm_secret_token, apm_api_key): + """ generate distributed traces from the collected TaskData and HostData """ + + tasks = [] + parent_start_time = None + for task_uuid, task in tasks_data.items(): + if parent_start_time is None: + parent_start_time = task.start + tasks.append(task) + + apm_cli = self.init_apm_client(apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key) + if apm_cli: + instrument() # Only call this once, as early as possible. + if traceparent: + parent = trace_parent_from_string(traceparent) + apm_cli.begin_transaction("Session", trace_parent=parent, start=parent_start_time) + else: + apm_cli.begin_transaction("Session", start=parent_start_time) + # Populate trace metadata attributes + if self.ansible_version is not None: + label(ansible_version=self.ansible_version) + label(ansible_session=self.session, ansible_host_name=self.host, ansible_host_user=self.user) + if self.ip_address is not None: + label(ansible_host_ip=self.ip_address) + + for task_data in tasks: + for host_uuid, host_data in task_data.host_data.items(): + self.create_span_data(apm_cli, task_data, host_data) + + apm_cli.end_transaction(name=__name__, result=status, duration=end_time - parent_start_time) + + def create_span_data(self, apm_cli, task_data, host_data): + """ create the span with the given TaskData and HostData """ + + name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name) + + message = "success" + status = "success" + enriched_error_message = None + if host_data.status == 'included': + rc = 0 + else: + res = host_data.result._result + rc = res.get('rc', 0) + if host_data.status == 'failed': + message = self.get_error_message(res) + enriched_error_message = self.enrich_error_message(res) + status = "failure" + elif host_data.status == 'skipped': + if 'skip_reason' in res: + message = res['skip_reason'] + else: + message = 'skipped' + status = "unknown" + + with capture_span(task_data.name, + start=task_data.start, + span_type="ansible.task.run", + duration=host_data.finish - task_data.start, + labels={"ansible.task.args": task_data.args, + "ansible.task.message": message, + "ansible.task.module": task_data.action, + "ansible.task.name": name, + "ansible.task.result": rc, + "ansible.task.host.name": host_data.name, + "ansible.task.host.status": host_data.status}) as span: + span.outcome = status + if 'failure' in status: + exception = AnsibleRuntimeError(message="{0}: {1} failed with error message {2}".format(task_data.action, name, enriched_error_message)) + apm_cli.capture_exception(exc_info=(type(exception), exception, exception.__traceback__), handled=True) + + def init_apm_client(self, apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key): + if apm_server_url: + return Client(service_name=apm_service_name, + server_url=apm_server_url, + verify_server_cert=False, + secret_token=apm_secret_token, + api_key=apm_api_key, + use_elastic_traceparent_header=True, + debug=True) + + @staticmethod + def get_error_message(result): + if result.get('exception') is not None: + return ElasticSource._last_line(result['exception']) + return result.get('msg', 'failed') + + @staticmethod + def _last_line(text): + lines = text.strip().split('\n') + return lines[-1] + + @staticmethod + def enrich_error_message(result): + message = result.get('msg', 'failed') + exception = result.get('exception') + stderr = result.get('stderr') + return ('message: "{0}"\nexception: "{1}"\nstderr: "{2}"').format(message, exception, stderr) + + +class CallbackModule(CallbackBase): + """ + This callback creates distributed traces with Elastic APM. + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'community.general.elastic' + CALLBACK_NEEDS_ENABLED = True + + def __init__(self, display=None): + super(CallbackModule, self).__init__(display=display) + self.hide_task_arguments = None + self.apm_service_name = None + self.ansible_playbook = None + self.traceparent = False + self.play_name = None + self.tasks_data = None + self.errors = 0 + self.disabled = False + + if ELASTIC_LIBRARY_IMPORT_ERROR: + raise_from( + AnsibleError('The `elastic-apm` must be installed to use this plugin'), + ELASTIC_LIBRARY_IMPORT_ERROR) + + self.tasks_data = OrderedDict() + + self.elastic = ElasticSource(display=self._display) + + def set_options(self, task_keys=None, var_options=None, direct=None): + super(CallbackModule, self).set_options(task_keys=task_keys, + var_options=var_options, + direct=direct) + + self.hide_task_arguments = self.get_option('hide_task_arguments') + + self.apm_service_name = self.get_option('apm_service_name') + if not self.apm_service_name: + self.apm_service_name = 'ansible' + + self.apm_server_url = self.get_option('apm_server_url') + self.apm_secret_token = self.get_option('apm_secret_token') + self.apm_api_key = self.get_option('apm_api_key') + self.apm_verify_server_cert = self.get_option('apm_verify_server_cert') + self.traceparent = self.get_option('traceparent') + + def v2_playbook_on_start(self, playbook): + self.ansible_playbook = basename(playbook._file_name) + + def v2_playbook_on_play_start(self, play): + self.play_name = play.get_name() + + def v2_runner_on_no_hosts(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_task_start(self, task, is_conditional): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_cleanup_task_start(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_handler_task_start(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_runner_on_failed(self, result, ignore_errors=False): + self.errors += 1 + self.elastic.finish_task( + self.tasks_data, + 'failed', + result + ) + + def v2_runner_on_ok(self, result): + self.elastic.finish_task( + self.tasks_data, + 'ok', + result + ) + + def v2_runner_on_skipped(self, result): + self.elastic.finish_task( + self.tasks_data, + 'skipped', + result + ) + + def v2_playbook_on_include(self, included_file): + self.elastic.finish_task( + self.tasks_data, + 'included', + included_file + ) + + def v2_playbook_on_stats(self, stats): + if self.errors == 0: + status = "success" + else: + status = "failure" + self.elastic.generate_distributed_traces( + self.tasks_data, + status, + time.time(), + self.traceparent, + self.apm_service_name, + self.apm_server_url, + self.apm_verify_server_cert, + self.apm_secret_token, + self.apm_api_key + ) + + def v2_runner_on_async_failed(self, result, **kwargs): + self.errors += 1 |