summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/general/plugins/callback/elastic.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:04:41 +0000
commit975f66f2eebe9dadba04f275774d4ab83f74cf25 (patch)
tree89bd26a93aaae6a25749145b7e4bca4a1e75b2be /ansible_collections/community/general/plugins/callback/elastic.py
parentInitial commit. (diff)
downloadansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.tar.xz
ansible-975f66f2eebe9dadba04f275774d4ab83f74cf25.zip
Adding upstream version 7.7.0+dfsg.upstream/7.7.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/general/plugins/callback/elastic.py')
-rw-r--r--ansible_collections/community/general/plugins/callback/elastic.py424
1 files changed, 424 insertions, 0 deletions
diff --git a/ansible_collections/community/general/plugins/callback/elastic.py b/ansible_collections/community/general/plugins/callback/elastic.py
new file mode 100644
index 000000000..37526c155
--- /dev/null
+++ b/ansible_collections/community/general/plugins/callback/elastic.py
@@ -0,0 +1,424 @@
+# Copyright (c) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+DOCUMENTATION = '''
+ author: Victor Martinez (@v1v) <VictorMartinezRubio@gmail.com>
+ name: elastic
+ type: notification
+ short_description: Create distributed traces for each Ansible task in Elastic APM
+ version_added: 3.8.0
+ description:
+ - This callback creates distributed traces for each Ansible task in Elastic APM.
+ - You can configure the plugin with environment variables.
+ - See U(https://www.elastic.co/guide/en/apm/agent/python/current/configuration.html).
+ options:
+ hide_task_arguments:
+ default: false
+ type: bool
+ description:
+ - Hide the arguments for a task.
+ env:
+ - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS
+ apm_service_name:
+ default: ansible
+ type: str
+ description:
+ - The service name resource attribute.
+ env:
+ - name: ELASTIC_APM_SERVICE_NAME
+ apm_server_url:
+ type: str
+ description:
+ - Use the APM server and its environment variables.
+ env:
+ - name: ELASTIC_APM_SERVER_URL
+ apm_secret_token:
+ type: str
+ description:
+ - Use the APM server token
+ env:
+ - name: ELASTIC_APM_SECRET_TOKEN
+ apm_api_key:
+ type: str
+ description:
+ - Use the APM API key
+ env:
+ - name: ELASTIC_APM_API_KEY
+ apm_verify_server_cert:
+ default: true
+ type: bool
+ description:
+ - Verifies the SSL certificate if an HTTPS connection.
+ env:
+ - name: ELASTIC_APM_VERIFY_SERVER_CERT
+ traceparent:
+ type: str
+ description:
+ - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header).
+ env:
+ - name: TRACEPARENT
+ requirements:
+ - elastic-apm (Python library)
+'''
+
+
+EXAMPLES = '''
+examples: |
+ Enable the plugin in ansible.cfg:
+ [defaults]
+ callbacks_enabled = community.general.elastic
+
+ Set the environment variable:
+ export ELASTIC_APM_SERVER_URL=<your APM server URL)>
+ export ELASTIC_APM_SERVICE_NAME=your_service_name
+ export ELASTIC_APM_API_KEY=your_APM_API_KEY
+'''
+
+import getpass
+import socket
+import time
+import uuid
+
+from collections import OrderedDict
+from os.path import basename
+
+from ansible.errors import AnsibleError, AnsibleRuntimeError
+from ansible.module_utils.six import raise_from
+from ansible.plugins.callback import CallbackBase
+
+try:
+ from elasticapm import Client, capture_span, trace_parent_from_string, instrument, label
+except ImportError as imp_exc:
+ ELASTIC_LIBRARY_IMPORT_ERROR = imp_exc
+else:
+ ELASTIC_LIBRARY_IMPORT_ERROR = None
+
+
+class TaskData:
+ """
+ Data about an individual task.
+ """
+
+ def __init__(self, uuid, name, path, play, action, args):
+ self.uuid = uuid
+ self.name = name
+ self.path = path
+ self.play = play
+ self.host_data = OrderedDict()
+ self.start = time.time()
+ self.action = action
+ self.args = args
+
+ def add_host(self, host):
+ if host.uuid in self.host_data:
+ if host.status == 'included':
+ # concatenate task include output from multiple items
+ host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result)
+ else:
+ return
+
+ self.host_data[host.uuid] = host
+
+
+class HostData:
+ """
+ Data about an individual host.
+ """
+
+ def __init__(self, uuid, name, status, result):
+ self.uuid = uuid
+ self.name = name
+ self.status = status
+ self.result = result
+ self.finish = time.time()
+
+
+class ElasticSource(object):
+ def __init__(self, display):
+ self.ansible_playbook = ""
+ self.ansible_version = None
+ self.session = str(uuid.uuid4())
+ self.host = socket.gethostname()
+ try:
+ self.ip_address = socket.gethostbyname(socket.gethostname())
+ except Exception as e:
+ self.ip_address = None
+ self.user = getpass.getuser()
+
+ self._display = display
+
+ def start_task(self, tasks_data, hide_task_arguments, play_name, task):
+ """ record the start of a task for one or more hosts """
+
+ uuid = task._uuid
+
+ if uuid in tasks_data:
+ return
+
+ name = task.get_name().strip()
+ path = task.get_path()
+ action = task.action
+ args = None
+
+ if not task.no_log and not hide_task_arguments:
+ args = ', '.join(('%s=%s' % a for a in task.args.items()))
+
+ tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args)
+
+ def finish_task(self, tasks_data, status, result):
+ """ record the results of a task for a single host """
+
+ task_uuid = result._task._uuid
+
+ if hasattr(result, '_host') and result._host is not None:
+ host_uuid = result._host._uuid
+ host_name = result._host.name
+ else:
+ host_uuid = 'include'
+ host_name = 'include'
+
+ task = tasks_data[task_uuid]
+
+ if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'):
+ self.ansible_version = result._task_fields['args'].get('_ansible_version')
+
+ task.add_host(HostData(host_uuid, host_name, status, result))
+
+ def generate_distributed_traces(self, tasks_data, status, end_time, traceparent, apm_service_name,
+ apm_server_url, apm_verify_server_cert, apm_secret_token, apm_api_key):
+ """ generate distributed traces from the collected TaskData and HostData """
+
+ tasks = []
+ parent_start_time = None
+ for task_uuid, task in tasks_data.items():
+ if parent_start_time is None:
+ parent_start_time = task.start
+ tasks.append(task)
+
+ apm_cli = self.init_apm_client(apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key)
+ if apm_cli:
+ instrument() # Only call this once, as early as possible.
+ if traceparent:
+ parent = trace_parent_from_string(traceparent)
+ apm_cli.begin_transaction("Session", trace_parent=parent, start=parent_start_time)
+ else:
+ apm_cli.begin_transaction("Session", start=parent_start_time)
+ # Populate trace metadata attributes
+ if self.ansible_version is not None:
+ label(ansible_version=self.ansible_version)
+ label(ansible_session=self.session, ansible_host_name=self.host, ansible_host_user=self.user)
+ if self.ip_address is not None:
+ label(ansible_host_ip=self.ip_address)
+
+ for task_data in tasks:
+ for host_uuid, host_data in task_data.host_data.items():
+ self.create_span_data(apm_cli, task_data, host_data)
+
+ apm_cli.end_transaction(name=__name__, result=status, duration=end_time - parent_start_time)
+
+ def create_span_data(self, apm_cli, task_data, host_data):
+ """ create the span with the given TaskData and HostData """
+
+ name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name)
+
+ message = "success"
+ status = "success"
+ enriched_error_message = None
+ if host_data.status == 'included':
+ rc = 0
+ else:
+ res = host_data.result._result
+ rc = res.get('rc', 0)
+ if host_data.status == 'failed':
+ message = self.get_error_message(res)
+ enriched_error_message = self.enrich_error_message(res)
+ status = "failure"
+ elif host_data.status == 'skipped':
+ if 'skip_reason' in res:
+ message = res['skip_reason']
+ else:
+ message = 'skipped'
+ status = "unknown"
+
+ with capture_span(task_data.name,
+ start=task_data.start,
+ span_type="ansible.task.run",
+ duration=host_data.finish - task_data.start,
+ labels={"ansible.task.args": task_data.args,
+ "ansible.task.message": message,
+ "ansible.task.module": task_data.action,
+ "ansible.task.name": name,
+ "ansible.task.result": rc,
+ "ansible.task.host.name": host_data.name,
+ "ansible.task.host.status": host_data.status}) as span:
+ span.outcome = status
+ if 'failure' in status:
+ exception = AnsibleRuntimeError(message="{0}: {1} failed with error message {2}".format(task_data.action, name, enriched_error_message))
+ apm_cli.capture_exception(exc_info=(type(exception), exception, exception.__traceback__), handled=True)
+
+ def init_apm_client(self, apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key):
+ if apm_server_url:
+ return Client(service_name=apm_service_name,
+ server_url=apm_server_url,
+ verify_server_cert=False,
+ secret_token=apm_secret_token,
+ api_key=apm_api_key,
+ use_elastic_traceparent_header=True,
+ debug=True)
+
+ @staticmethod
+ def get_error_message(result):
+ if result.get('exception') is not None:
+ return ElasticSource._last_line(result['exception'])
+ return result.get('msg', 'failed')
+
+ @staticmethod
+ def _last_line(text):
+ lines = text.strip().split('\n')
+ return lines[-1]
+
+ @staticmethod
+ def enrich_error_message(result):
+ message = result.get('msg', 'failed')
+ exception = result.get('exception')
+ stderr = result.get('stderr')
+ return ('message: "{0}"\nexception: "{1}"\nstderr: "{2}"').format(message, exception, stderr)
+
+
+class CallbackModule(CallbackBase):
+ """
+ This callback creates distributed traces with Elastic APM.
+ """
+
+ CALLBACK_VERSION = 2.0
+ CALLBACK_TYPE = 'notification'
+ CALLBACK_NAME = 'community.general.elastic'
+ CALLBACK_NEEDS_ENABLED = True
+
+ def __init__(self, display=None):
+ super(CallbackModule, self).__init__(display=display)
+ self.hide_task_arguments = None
+ self.apm_service_name = None
+ self.ansible_playbook = None
+ self.traceparent = False
+ self.play_name = None
+ self.tasks_data = None
+ self.errors = 0
+ self.disabled = False
+
+ if ELASTIC_LIBRARY_IMPORT_ERROR:
+ raise_from(
+ AnsibleError('The `elastic-apm` must be installed to use this plugin'),
+ ELASTIC_LIBRARY_IMPORT_ERROR)
+
+ self.tasks_data = OrderedDict()
+
+ self.elastic = ElasticSource(display=self._display)
+
+ def set_options(self, task_keys=None, var_options=None, direct=None):
+ super(CallbackModule, self).set_options(task_keys=task_keys,
+ var_options=var_options,
+ direct=direct)
+
+ self.hide_task_arguments = self.get_option('hide_task_arguments')
+
+ self.apm_service_name = self.get_option('apm_service_name')
+ if not self.apm_service_name:
+ self.apm_service_name = 'ansible'
+
+ self.apm_server_url = self.get_option('apm_server_url')
+ self.apm_secret_token = self.get_option('apm_secret_token')
+ self.apm_api_key = self.get_option('apm_api_key')
+ self.apm_verify_server_cert = self.get_option('apm_verify_server_cert')
+ self.traceparent = self.get_option('traceparent')
+
+ def v2_playbook_on_start(self, playbook):
+ self.ansible_playbook = basename(playbook._file_name)
+
+ def v2_playbook_on_play_start(self, play):
+ self.play_name = play.get_name()
+
+ def v2_runner_on_no_hosts(self, task):
+ self.elastic.start_task(
+ self.tasks_data,
+ self.hide_task_arguments,
+ self.play_name,
+ task
+ )
+
+ def v2_playbook_on_task_start(self, task, is_conditional):
+ self.elastic.start_task(
+ self.tasks_data,
+ self.hide_task_arguments,
+ self.play_name,
+ task
+ )
+
+ def v2_playbook_on_cleanup_task_start(self, task):
+ self.elastic.start_task(
+ self.tasks_data,
+ self.hide_task_arguments,
+ self.play_name,
+ task
+ )
+
+ def v2_playbook_on_handler_task_start(self, task):
+ self.elastic.start_task(
+ self.tasks_data,
+ self.hide_task_arguments,
+ self.play_name,
+ task
+ )
+
+ def v2_runner_on_failed(self, result, ignore_errors=False):
+ self.errors += 1
+ self.elastic.finish_task(
+ self.tasks_data,
+ 'failed',
+ result
+ )
+
+ def v2_runner_on_ok(self, result):
+ self.elastic.finish_task(
+ self.tasks_data,
+ 'ok',
+ result
+ )
+
+ def v2_runner_on_skipped(self, result):
+ self.elastic.finish_task(
+ self.tasks_data,
+ 'skipped',
+ result
+ )
+
+ def v2_playbook_on_include(self, included_file):
+ self.elastic.finish_task(
+ self.tasks_data,
+ 'included',
+ included_file
+ )
+
+ def v2_playbook_on_stats(self, stats):
+ if self.errors == 0:
+ status = "success"
+ else:
+ status = "failure"
+ self.elastic.generate_distributed_traces(
+ self.tasks_data,
+ status,
+ time.time(),
+ self.traceparent,
+ self.apm_service_name,
+ self.apm_server_url,
+ self.apm_verify_server_cert,
+ self.apm_secret_token,
+ self.apm_api_key
+ )
+
+ def v2_runner_on_async_failed(self, result, **kwargs):
+ self.errors += 1