diff options
Diffstat (limited to 'ansible_collections/community/general/plugins/callback/opentelemetry.py')
-rw-r--r-- | ansible_collections/community/general/plugins/callback/opentelemetry.py | 569 |
1 files changed, 569 insertions, 0 deletions
diff --git a/ansible_collections/community/general/plugins/callback/opentelemetry.py b/ansible_collections/community/general/plugins/callback/opentelemetry.py new file mode 100644 index 000000000..e00e1d71a --- /dev/null +++ b/ansible_collections/community/general/plugins/callback/opentelemetry.py @@ -0,0 +1,569 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2021, Victor Martinez <VictorMartinezRubio@gmail.com> +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + author: Victor Martinez (@v1v) <VictorMartinezRubio@gmail.com> + name: opentelemetry + type: notification + short_description: Create distributed traces with OpenTelemetry + version_added: 3.7.0 + description: + - This callback creates distributed traces for each Ansible task with OpenTelemetry. + - You can configure the OpenTelemetry exporter and SDK with environment variables. + - See U(https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html). + - See U(https://opentelemetry-python.readthedocs.io/en/latest/sdk/environment_variables.html#opentelemetry-sdk-environment-variables). + options: + hide_task_arguments: + default: false + type: bool + description: + - Hide the arguments for a task. + env: + - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS + ini: + - section: callback_opentelemetry + key: hide_task_arguments + version_added: 5.3.0 + enable_from_environment: + type: str + description: + - Whether to enable this callback only if the given environment variable exists and it is set to C(true). + - This is handy when you use Configuration as Code and want to send distributed traces + if running in the CI rather when running Ansible locally. + - For such, it evaluates the given I(enable_from_environment) value as environment variable + and if set to true this plugin will be enabled. + env: + - name: ANSIBLE_OPENTELEMETRY_ENABLE_FROM_ENVIRONMENT + ini: + - section: callback_opentelemetry + key: enable_from_environment + version_added: 5.3.0 + version_added: 3.8.0 + otel_service_name: + default: ansible + type: str + description: + - The service name resource attribute. + env: + - name: OTEL_SERVICE_NAME + ini: + - section: callback_opentelemetry + key: otel_service_name + version_added: 5.3.0 + traceparent: + default: None + type: str + description: + - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header). + env: + - name: TRACEPARENT + disable_logs: + default: false + type: bool + description: + - Disable sending logs. + env: + - name: ANSIBLE_OPENTELEMETRY_DISABLE_LOGS + ini: + - section: callback_opentelemetry + key: disable_logs + version_added: 5.8.0 + requirements: + - opentelemetry-api (Python library) + - opentelemetry-exporter-otlp (Python library) + - opentelemetry-sdk (Python library) +''' + + +EXAMPLES = ''' +examples: | + Enable the plugin in ansible.cfg: + [defaults] + callbacks_enabled = community.general.opentelemetry + [callback_opentelemetry] + enable_from_environment = ANSIBLE_OPENTELEMETRY_ENABLED + + Set the environment variable: + export OTEL_EXPORTER_OTLP_ENDPOINT=<your endpoint (OTLP/HTTP)> + export OTEL_EXPORTER_OTLP_HEADERS="authorization=Bearer your_otel_token" + export OTEL_SERVICE_NAME=your_service_name + export ANSIBLE_OPENTELEMETRY_ENABLED=true +''' + +import getpass +import os +import socket +import sys +import time +import uuid + +from collections import OrderedDict +from os.path import basename + +from ansible.errors import AnsibleError +from ansible.module_utils.six import raise_from +from ansible.module_utils.six.moves.urllib.parse import urlparse +from ansible.plugins.callback import CallbackBase + +try: + from opentelemetry import trace + from opentelemetry.trace import SpanKind + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import SERVICE_NAME, Resource + from opentelemetry.trace.status import Status, StatusCode + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor + ) + + # Support for opentelemetry-api <= 1.12 + try: + from opentelemetry.util._time import _time_ns + except ImportError as imp_exc: + OTEL_LIBRARY_TIME_NS_ERROR = imp_exc + else: + OTEL_LIBRARY_TIME_NS_ERROR = None + +except ImportError as imp_exc: + OTEL_LIBRARY_IMPORT_ERROR = imp_exc + OTEL_LIBRARY_TIME_NS_ERROR = imp_exc +else: + OTEL_LIBRARY_IMPORT_ERROR = None + + +if sys.version_info >= (3, 7): + time_ns = time.time_ns +elif not OTEL_LIBRARY_TIME_NS_ERROR: + time_ns = _time_ns +else: + def time_ns(): + # Support versions older than 3.7 with opentelemetry-api > 1.12 + return int(time.time() * 1e9) + + +class TaskData: + """ + Data about an individual task. + """ + + def __init__(self, uuid, name, path, play, action, args): + self.uuid = uuid + self.name = name + self.path = path + self.play = play + self.host_data = OrderedDict() + self.start = time_ns() + self.action = action + self.args = args + self.dump = None + + def add_host(self, host): + if host.uuid in self.host_data: + if host.status == 'included': + # concatenate task include output from multiple items + host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result) + else: + return + + self.host_data[host.uuid] = host + + +class HostData: + """ + Data about an individual host. + """ + + def __init__(self, uuid, name, status, result): + self.uuid = uuid + self.name = name + self.status = status + self.result = result + self.finish = time_ns() + + +class OpenTelemetrySource(object): + def __init__(self, display): + self.ansible_playbook = "" + self.ansible_version = None + self.session = str(uuid.uuid4()) + self.host = socket.gethostname() + try: + self.ip_address = socket.gethostbyname(socket.gethostname()) + except Exception as e: + self.ip_address = None + self.user = getpass.getuser() + + self._display = display + + def traceparent_context(self, traceparent): + carrier = dict() + carrier['traceparent'] = traceparent + return TraceContextTextMapPropagator().extract(carrier=carrier) + + def start_task(self, tasks_data, hide_task_arguments, play_name, task): + """ record the start of a task for one or more hosts """ + + uuid = task._uuid + + if uuid in tasks_data: + return + + name = task.get_name().strip() + path = task.get_path() + action = task.action + args = None + + if not task.no_log and not hide_task_arguments: + args = task.args + + tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args) + + def finish_task(self, tasks_data, status, result, dump): + """ record the results of a task for a single host """ + + task_uuid = result._task._uuid + + if hasattr(result, '_host') and result._host is not None: + host_uuid = result._host._uuid + host_name = result._host.name + else: + host_uuid = 'include' + host_name = 'include' + + task = tasks_data[task_uuid] + + if self.ansible_version is None and hasattr(result, '_task_fields') and result._task_fields['args'].get('_ansible_version'): + self.ansible_version = result._task_fields['args'].get('_ansible_version') + + task.dump = dump + task.add_host(HostData(host_uuid, host_name, status, result)) + + def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks_data, status, traceparent, disable_logs): + """ generate distributed traces from the collected TaskData and HostData """ + + tasks = [] + parent_start_time = None + for task_uuid, task in tasks_data.items(): + if parent_start_time is None: + parent_start_time = task.start + tasks.append(task) + + trace.set_tracer_provider( + TracerProvider( + resource=Resource.create({SERVICE_NAME: otel_service_name}) + ) + ) + + processor = BatchSpanProcessor(OTLPSpanExporter()) + + trace.get_tracer_provider().add_span_processor(processor) + + tracer = trace.get_tracer(__name__) + + with tracer.start_as_current_span(ansible_playbook, context=self.traceparent_context(traceparent), + start_time=parent_start_time, kind=SpanKind.SERVER) as parent: + parent.set_status(status) + # Populate trace metadata attributes + if self.ansible_version is not None: + parent.set_attribute("ansible.version", self.ansible_version) + parent.set_attribute("ansible.session", self.session) + parent.set_attribute("ansible.host.name", self.host) + if self.ip_address is not None: + parent.set_attribute("ansible.host.ip", self.ip_address) + parent.set_attribute("ansible.host.user", self.user) + for task in tasks: + for host_uuid, host_data in task.host_data.items(): + with tracer.start_as_current_span(task.name, start_time=task.start, end_on_exit=False) as span: + self.update_span_data(task, host_data, span, disable_logs) + + def update_span_data(self, task_data, host_data, span, disable_logs): + """ update the span with the given TaskData and HostData """ + + name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name) + + message = 'success' + res = {} + rc = 0 + status = Status(status_code=StatusCode.OK) + if host_data.status != 'included': + # Support loops + if 'results' in host_data.result._result: + if host_data.status == 'failed': + message = self.get_error_message_from_results(host_data.result._result['results'], task_data.action) + enriched_error_message = self.enrich_error_message_from_results(host_data.result._result['results'], task_data.action) + else: + res = host_data.result._result + rc = res.get('rc', 0) + if host_data.status == 'failed': + message = self.get_error_message(res) + enriched_error_message = self.enrich_error_message(res) + + if host_data.status == 'failed': + status = Status(status_code=StatusCode.ERROR, description=message) + # Record an exception with the task message + span.record_exception(BaseException(enriched_error_message)) + elif host_data.status == 'skipped': + message = res['skip_reason'] if 'skip_reason' in res else 'skipped' + status = Status(status_code=StatusCode.UNSET) + elif host_data.status == 'ignored': + status = Status(status_code=StatusCode.UNSET) + + span.set_status(status) + if isinstance(task_data.args, dict) and "gather_facts" not in task_data.action: + names = tuple(self.transform_ansible_unicode_to_str(k) for k in task_data.args.keys()) + values = tuple(self.transform_ansible_unicode_to_str(k) for k in task_data.args.values()) + self.set_span_attribute(span, ("ansible.task.args.name"), names) + self.set_span_attribute(span, ("ansible.task.args.value"), values) + self.set_span_attribute(span, "ansible.task.module", task_data.action) + self.set_span_attribute(span, "ansible.task.message", message) + self.set_span_attribute(span, "ansible.task.name", name) + self.set_span_attribute(span, "ansible.task.result", rc) + self.set_span_attribute(span, "ansible.task.host.name", host_data.name) + self.set_span_attribute(span, "ansible.task.host.status", host_data.status) + # This will allow to enrich the service map + self.add_attributes_for_service_map_if_possible(span, task_data) + # Send logs + if not disable_logs: + span.add_event(task_data.dump) + span.end(end_time=host_data.finish) + + def set_span_attribute(self, span, attributeName, attributeValue): + """ update the span attribute with the given attribute and value if not None """ + + if span is None and self._display is not None: + self._display.warning('span object is None. Please double check if that is expected.') + else: + if attributeValue is not None: + span.set_attribute(attributeName, attributeValue) + + def add_attributes_for_service_map_if_possible(self, span, task_data): + """Update the span attributes with the service that the task interacted with, if possible.""" + + redacted_url = self.parse_and_redact_url_if_possible(task_data.args) + if redacted_url: + self.set_span_attribute(span, "http.url", redacted_url.geturl()) + + @staticmethod + def parse_and_redact_url_if_possible(args): + """Parse and redact the url, if possible.""" + + try: + parsed_url = urlparse(OpenTelemetrySource.url_from_args(args)) + except ValueError: + return None + + if OpenTelemetrySource.is_valid_url(parsed_url): + return OpenTelemetrySource.redact_user_password(parsed_url) + return None + + @staticmethod + def url_from_args(args): + # the order matters + url_args = ("url", "api_url", "baseurl", "repo", "server_url", "chart_repo_url", "registry_url", "endpoint", "uri", "updates_url") + for arg in url_args: + if args is not None and args.get(arg): + return args.get(arg) + return "" + + @staticmethod + def redact_user_password(url): + return url._replace(netloc=url.hostname) if url.password else url + + @staticmethod + def is_valid_url(url): + if all([url.scheme, url.netloc, url.hostname]): + return "{{" not in url.hostname + return False + + @staticmethod + def transform_ansible_unicode_to_str(value): + parsed_url = urlparse(str(value)) + if OpenTelemetrySource.is_valid_url(parsed_url): + return OpenTelemetrySource.redact_user_password(parsed_url).geturl() + return str(value) + + @staticmethod + def get_error_message(result): + if result.get('exception') is not None: + return OpenTelemetrySource._last_line(result['exception']) + return result.get('msg', 'failed') + + @staticmethod + def get_error_message_from_results(results, action): + for result in results: + if result.get('failed', False): + return ('{0}({1}) - {2}').format(action, result.get('item', 'none'), OpenTelemetrySource.get_error_message(result)) + + @staticmethod + def _last_line(text): + lines = text.strip().split('\n') + return lines[-1] + + @staticmethod + def enrich_error_message(result): + message = result.get('msg', 'failed') + exception = result.get('exception') + stderr = result.get('stderr') + return ('message: "{0}"\nexception: "{1}"\nstderr: "{2}"').format(message, exception, stderr) + + @staticmethod + def enrich_error_message_from_results(results, action): + message = "" + for result in results: + if result.get('failed', False): + message = ('{0}({1}) - {2}\n{3}').format(action, result.get('item', 'none'), OpenTelemetrySource.enrich_error_message(result), message) + return message + + +class CallbackModule(CallbackBase): + """ + This callback creates distributed traces. + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'community.general.opentelemetry' + CALLBACK_NEEDS_ENABLED = True + + def __init__(self, display=None): + super(CallbackModule, self).__init__(display=display) + self.hide_task_arguments = None + self.disable_logs = None + self.otel_service_name = None + self.ansible_playbook = None + self.play_name = None + self.tasks_data = None + self.errors = 0 + self.disabled = False + self.traceparent = False + + if OTEL_LIBRARY_IMPORT_ERROR: + raise_from( + AnsibleError('The `opentelemetry-api`, `opentelemetry-exporter-otlp` or `opentelemetry-sdk` must be installed to use this plugin'), + OTEL_LIBRARY_IMPORT_ERROR) + + self.tasks_data = OrderedDict() + + self.opentelemetry = OpenTelemetrySource(display=self._display) + + def set_options(self, task_keys=None, var_options=None, direct=None): + super(CallbackModule, self).set_options(task_keys=task_keys, + var_options=var_options, + direct=direct) + + environment_variable = self.get_option('enable_from_environment') + if environment_variable is not None and os.environ.get(environment_variable, 'false').lower() != 'true': + self.disabled = True + self._display.warning("The `enable_from_environment` option has been set and {0} is not enabled. " + "Disabling the `opentelemetry` callback plugin.".format(environment_variable)) + + self.hide_task_arguments = self.get_option('hide_task_arguments') + + self.disable_logs = self.get_option('disable_logs') + + self.otel_service_name = self.get_option('otel_service_name') + + if not self.otel_service_name: + self.otel_service_name = 'ansible' + + # See https://github.com/open-telemetry/opentelemetry-specification/issues/740 + self.traceparent = self.get_option('traceparent') + + def v2_playbook_on_start(self, playbook): + self.ansible_playbook = basename(playbook._file_name) + + def v2_playbook_on_play_start(self, play): + self.play_name = play.get_name() + + def v2_runner_on_no_hosts(self, task): + self.opentelemetry.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_task_start(self, task, is_conditional): + self.opentelemetry.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_cleanup_task_start(self, task): + self.opentelemetry.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_handler_task_start(self, task): + self.opentelemetry.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_runner_on_failed(self, result, ignore_errors=False): + if ignore_errors: + status = 'ignored' + else: + status = 'failed' + self.errors += 1 + + self.opentelemetry.finish_task( + self.tasks_data, + status, + result, + self._dump_results(result._result) + ) + + def v2_runner_on_ok(self, result): + self.opentelemetry.finish_task( + self.tasks_data, + 'ok', + result, + self._dump_results(result._result) + ) + + def v2_runner_on_skipped(self, result): + self.opentelemetry.finish_task( + self.tasks_data, + 'skipped', + result, + self._dump_results(result._result) + ) + + def v2_playbook_on_include(self, included_file): + self.opentelemetry.finish_task( + self.tasks_data, + 'included', + included_file, + "" + ) + + def v2_playbook_on_stats(self, stats): + if self.errors == 0: + status = Status(status_code=StatusCode.OK) + else: + status = Status(status_code=StatusCode.ERROR) + self.opentelemetry.generate_distributed_traces( + self.otel_service_name, + self.ansible_playbook, + self.tasks_data, + status, + self.traceparent, + self.disable_logs + ) + + def v2_runner_on_async_failed(self, result, **kwargs): + self.errors += 1 |