From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/telemetry/module.py | 2074 ++++++++++++++++++++++++++++++++++++ 1 file changed, 2074 insertions(+) create mode 100644 src/pybind/mgr/telemetry/module.py (limited to 'src/pybind/mgr/telemetry/module.py') diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py new file mode 100644 index 000000000..f729b9180 --- /dev/null +++ b/src/pybind/mgr/telemetry/module.py @@ -0,0 +1,2074 @@ +""" +Telemetry module for ceph-mgr + +Collect statistics from Ceph cluster and send this back to the Ceph project +when user has opted-in +""" +import logging +import numbers +import enum +import errno +import hashlib +import json +import rbd +import requests +import uuid +import time +from datetime import datetime, timedelta +from prettytable import PrettyTable +from threading import Event, Lock +from collections import defaultdict +from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union + +from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT + + +ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf'] + +LICENSE = 'sharing-1-0' +LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0' +LICENSE_URL = 'https://cdla.io/sharing-1-0/' +NO_SALT_CNT = 0 + +# Latest revision of the telemetry report. Bump this each time we make +# *any* change. +REVISION = 3 + +# History of revisions +# -------------------- +# +# Version 1: +# Mimic and/or nautilus are lumped together here, since +# we didn't track revisions yet. +# +# Version 2: +# - added revision tracking, nagging, etc. +# - added config option changes +# - added channels +# - added explicit license acknowledgement to the opt-in process +# +# Version 3: +# - added device health metrics (i.e., SMART data, minus serial number) +# - remove crush_rule +# - added CephFS metadata (how many MDSs, fs features, how many data pools, +# how much metadata is cached, rfiles, rbytes, rsnapshots) +# - added more pool metadata (rep vs ec, cache tiering mode, ec profile) +# - added host count, and counts for hosts with each of (mon, osd, mds, mgr) +# - whether an OSD cluster network is in use +# - rbd pool and image count, and rbd mirror mode (pool-level) +# - rgw daemons, zones, zonegroups; which rgw frontends +# - crush map stats + +class Collection(str, enum.Enum): + basic_base = 'basic_base' + device_base = 'device_base' + crash_base = 'crash_base' + ident_base = 'ident_base' + perf_perf = 'perf_perf' + basic_mds_metadata = 'basic_mds_metadata' + basic_pool_usage = 'basic_pool_usage' + basic_usage_by_class = 'basic_usage_by_class' + basic_rook_v01 = 'basic_rook_v01' + perf_memory_metrics = 'perf_memory_metrics' + basic_pool_options_bluestore = 'basic_pool_options_bluestore' + +MODULE_COLLECTION : List[Dict] = [ + { + "name": Collection.basic_base, + "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)", + "channel": "basic", + "nag": False + }, + { + "name": Collection.device_base, + "description": "Information about device health metrics", + "channel": "device", + "nag": False + }, + { + "name": Collection.crash_base, + "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)", + "channel": "crash", + "nag": False + }, + { + "name": Collection.ident_base, + "description": "User-provided identifying information about the cluster", + "channel": "ident", + "nag": False + }, + { + "name": Collection.perf_perf, + "description": "Information about performance counters of the cluster", + "channel": "perf", + "nag": True + }, + { + "name": Collection.basic_mds_metadata, + "description": "MDS metadata", + "channel": "basic", + "nag": False + }, + { + "name": Collection.basic_pool_usage, + "description": "Default pool application and usage statistics", + "channel": "basic", + "nag": False + }, + { + "name": Collection.basic_usage_by_class, + "description": "Default device class usage statistics", + "channel": "basic", + "nag": False + }, + { + "name": Collection.basic_rook_v01, + "description": "Basic Rook deployment data", + "channel": "basic", + "nag": True + }, + { + "name": Collection.perf_memory_metrics, + "description": "Heap stats and mempools for mon and mds", + "channel": "perf", + "nag": False + }, + { + "name": Collection.basic_pool_options_bluestore, + "description": "Per-pool bluestore config options", + "channel": "basic", + "nag": False + }, +] + +ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [ + # Note: a key cannot be both a node and a leaf, e.g. + # "rook/a/b" + # "rook/a/b/c" + ("rook/version", Collection.basic_rook_v01), + ("rook/kubernetes/version", Collection.basic_rook_v01), + ("rook/csi/version", Collection.basic_rook_v01), + ("rook/node/count/kubernetes-total", Collection.basic_rook_v01), + ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01), + ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01), + ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01), + ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01), + ("rook/usage/storage-class/count/total", Collection.basic_rook_v01), + ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01), + ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01), + ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01), + ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01), + ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01), + ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01), + ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01), + ("rook/cluster/mon/count", Collection.basic_rook_v01), + ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01), + ("rook/cluster/mon/max-id", Collection.basic_rook_v01), + ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01), + ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01), + ("rook/cluster/network/provider", Collection.basic_rook_v01), + ("rook/cluster/external-mode", Collection.basic_rook_v01), +] + +class Module(MgrModule): + metadata_keys = [ + "arch", + "ceph_version", + "os", + "cpu", + "kernel_description", + "kernel_version", + "distro_description", + "distro" + ] + + MODULE_OPTIONS = [ + Option(name='url', + type='str', + default='https://telemetry.ceph.com/report'), + Option(name='device_url', + type='str', + default='https://telemetry.ceph.com/device'), + Option(name='enabled', + type='bool', + default=False), + Option(name='last_opt_revision', + type='int', + default=1), + Option(name='leaderboard', + type='bool', + default=False), + Option(name='leaderboard_description', + type='str', + default=None), + Option(name='description', + type='str', + default=None), + Option(name='contact', + type='str', + default=None), + Option(name='organization', + type='str', + default=None), + Option(name='proxy', + type='str', + default=None), + Option(name='interval', + type='int', + default=24, + min=8), + Option(name='channel_basic', + type='bool', + default=True, + desc='Share basic cluster information (size, version)'), + Option(name='channel_ident', + type='bool', + default=False, + desc='Share a user-provided description and/or contact email for the cluster'), + Option(name='channel_crash', + type='bool', + default=True, + desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'), + Option(name='channel_device', + type='bool', + default=True, + desc=('Share device health metrics ' + '(e.g., SMART data, minus potentially identifying info like serial numbers)')), + Option(name='channel_perf', + type='bool', + default=False, + desc='Share various performance metrics of a cluster'), + ] + + @property + def config_keys(self) -> Dict[str, OptionValue]: + return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + self.run = False + self.db_collection: Optional[List[str]] = None + self.last_opted_in_ceph_version: Optional[int] = None + self.last_opted_out_ceph_version: Optional[int] = None + self.last_upload: Optional[int] = None + self.last_report: Dict[str, Any] = dict() + self.report_id: Optional[str] = None + self.salt: Optional[str] = None + self.get_report_lock = Lock() + self.config_update_module_option() + # for mypy which does not run the code + if TYPE_CHECKING: + self.url = '' + self.device_url = '' + self.enabled = False + self.last_opt_revision = 0 + self.leaderboard = '' + self.leaderboard_description = '' + self.interval = 0 + self.proxy = '' + self.channel_basic = True + self.channel_ident = False + self.channel_crash = True + self.channel_device = True + self.channel_perf = False + self.db_collection = ['basic_base', 'device_base'] + self.last_opted_in_ceph_version = 17 + self.last_opted_out_ceph_version = 0 + + def config_update_module_option(self) -> None: + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) + + def config_notify(self) -> None: + self.config_update_module_option() + # wake up serve() thread + self.event.set() + + def load(self) -> None: + last_upload = self.get_store('last_upload', None) + if last_upload is None: + self.last_upload = None + else: + self.last_upload = int(last_upload) + + report_id = self.get_store('report_id', None) + if report_id is None: + self.report_id = str(uuid.uuid4()) + self.set_store('report_id', self.report_id) + else: + self.report_id = report_id + + salt = self.get_store('salt', None) + if salt is None: + self.salt = str(uuid.uuid4()) + self.set_store('salt', self.salt) + else: + self.salt = salt + + self.init_collection() + + last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None) + if last_opted_in_ceph_version is None: + self.last_opted_in_ceph_version = None + else: + self.last_opted_in_ceph_version = int(last_opted_in_ceph_version) + + last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None) + if last_opted_out_ceph_version is None: + self.last_opted_out_ceph_version = None + else: + self.last_opted_out_ceph_version = int(last_opted_out_ceph_version) + + def gather_osd_metadata(self, + osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]: + keys = ["osd_objectstore", "rotational"] + keys += self.metadata_keys + + metadata: Dict[str, Dict[str, int]] = dict() + for key in keys: + metadata[key] = defaultdict(int) + + for osd in osd_map['osds']: + res = self.get_metadata('osd', str(osd['osd'])) + if res is None: + self.log.debug('Could not get metadata for osd.%s' % str(osd['osd'])) + continue + for k, v in res.items(): + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + + def gather_mon_metadata(self, + mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]: + keys = list() + keys += self.metadata_keys + + metadata: Dict[str, Dict[str, int]] = dict() + for key in keys: + metadata[key] = defaultdict(int) + + for mon in mon_map['mons']: + res = self.get_metadata('mon', mon['name']) + if res is None: + self.log.debug('Could not get metadata for mon.%s' % (mon['name'])) + continue + for k, v in res.items(): + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + + def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]: + metadata: Dict[str, Dict[str, int]] = dict() + + res = self.get('mds_metadata') # metadata of *all* mds daemons + if res is None or not res: + self.log.debug('Could not get metadata for mds daemons') + return metadata + + keys = list() + keys += self.metadata_keys + + for key in keys: + metadata[key] = defaultdict(int) + + for mds in res.values(): + for k, v in mds.items(): + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + + def gather_crush_info(self) -> Dict[str, Union[int, + bool, + List[int], + Dict[str, int], + Dict[int, int]]]: + osdmap = self.get_osdmap() + crush_raw = osdmap.get_crush() + crush = crush_raw.dump() + + BucketKeyT = TypeVar('BucketKeyT', int, str) + + def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None: + if k in d: + d[k] += 1 + else: + d[k] = 1 + + device_classes: Dict[str, int] = {} + for dev in crush['devices']: + inc(device_classes, dev.get('class', '')) + + bucket_algs: Dict[str, int] = {} + bucket_types: Dict[str, int] = {} + bucket_sizes: Dict[int, int] = {} + for bucket in crush['buckets']: + if '~' in bucket['name']: # ignore shadow buckets + continue + inc(bucket_algs, bucket['alg']) + inc(bucket_types, bucket['type_id']) + inc(bucket_sizes, len(bucket['items'])) + + return { + 'num_devices': len(crush['devices']), + 'num_types': len(crush['types']), + 'num_buckets': len(crush['buckets']), + 'num_rules': len(crush['rules']), + 'device_classes': list(device_classes.values()), + 'tunables': crush['tunables'], + 'compat_weight_set': '-1' in crush['choose_args'], + 'num_weight_sets': len(crush['choose_args']), + 'bucket_algs': bucket_algs, + 'bucket_sizes': bucket_sizes, + 'bucket_types': bucket_types, + } + + def gather_configs(self) -> Dict[str, List[str]]: + # cluster config options + cluster = set() + r, outb, outs = self.mon_command({ + 'prefix': 'config dump', + 'format': 'json' + }) + if r != 0: + return {} + try: + dump = json.loads(outb) + except json.decoder.JSONDecodeError: + return {} + for opt in dump: + name = opt.get('name') + if name: + cluster.add(name) + # daemon-reported options (which may include ceph.conf) + active = set() + ls = self.get("modified_config_options") + for opt in ls.get('options', {}): + active.add(opt) + return { + 'cluster_changed': sorted(list(cluster)), + 'active_changed': sorted(list(active)), + } + + def anonymize_entity_name(self, entity_name:str) -> str: + if '.' not in entity_name: + self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found") + return entity_name + + (etype, eid) = entity_name.split('.', 1) + m = hashlib.sha1() + salt = '' + if self.salt is not None: + salt = self.salt + # avoid asserting that salt exists + if not self.salt: + # do not set self.salt to a temp value + salt = f"no_salt_found_{NO_SALT_CNT}" + NO_SALT_CNT += 1 + self.log.debug(f"No salt found, created a temp one: {salt}") + m.update(salt.encode('utf-8')) + m.update(eid.encode('utf-8')) + m.update(salt.encode('utf-8')) + + return etype + '.' + m.hexdigest() + + def get_heap_stats(self) -> Dict[str, dict]: + result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + anonymized_daemons = {} + osd_map = self.get('osd_map') + + # Combine available daemons + daemons = [] + for osd in osd_map['osds']: + daemons.append('osd'+'.'+str(osd['osd'])) + # perf_memory_metrics collection (1/2) + if self.is_enabled_collection(Collection.perf_memory_metrics): + mon_map = self.get('mon_map') + mds_metadata = self.get('mds_metadata') + for mon in mon_map['mons']: + daemons.append('mon'+'.'+mon['name']) + for mds in mds_metadata: + daemons.append('mds'+'.'+mds) + + # Grab output from the "daemon.x heap stats" command + for daemon in daemons: + daemon_type, daemon_id = daemon.split('.', 1) + heap_stats = self.parse_heap_stats(daemon_type, daemon_id) + if heap_stats: + if (daemon_type != 'osd'): + # Anonymize mon and mds + anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) + daemon = anonymized_daemons[daemon] + result[daemon_type][daemon] = heap_stats + else: + continue + + if anonymized_daemons: + # for debugging purposes only, this data is never reported + self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons)) + return result + + def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]: + parsed_output = {} + + cmd_dict = { + 'prefix': 'heap', + 'heapcmd': 'stats' + } + r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict) + + if r != 0: + self.log.error("Invalid command dictionary: {}".format(cmd_dict)) + else: + if 'tcmalloc heap stats' in outb: + values = [int(i) for i in outb.split() if i.isdigit()] + # `categories` must be ordered this way for the correct output to be parsed + categories = ['use_by_application', + 'page_heap_freelist', + 'central_cache_freelist', + 'transfer_cache_freelist', + 'thread_cache_freelists', + 'malloc_metadata', + 'actual_memory_used', + 'released_to_os', + 'virtual_address_space_used', + 'spans_in_use', + 'thread_heaps_in_use', + 'tcmalloc_page_size'] + if len(values) != len(categories): + self.log.error('Received unexpected output from {}.{}; ' \ + 'number of values should match the number' \ + 'of expected categories:\n values: len={} {} '\ + '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs)) + else: + parsed_output = dict(zip(categories, values)) + else: + self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs)) + + return parsed_output + + def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]: + result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + anonymized_daemons = {} + osd_map = self.get('osd_map') + + # Combine available daemons + daemons = [] + for osd in osd_map['osds']: + daemons.append('osd'+'.'+str(osd['osd'])) + # perf_memory_metrics collection (2/2) + if self.is_enabled_collection(Collection.perf_memory_metrics): + mon_map = self.get('mon_map') + mds_metadata = self.get('mds_metadata') + for mon in mon_map['mons']: + daemons.append('mon'+'.'+mon['name']) + for mds in mds_metadata: + daemons.append('mds'+'.'+mds) + + # Grab output from the "dump_mempools" command + for daemon in daemons: + daemon_type, daemon_id = daemon.split('.', 1) + cmd_dict = { + 'prefix': 'dump_mempools', + 'format': 'json' + } + r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict) + if r != 0: + self.log.error("Invalid command dictionary: {}".format(cmd_dict)) + continue + else: + try: + # This is where the mempool will land. + dump = json.loads(outb) + if mode == 'separated': + # Anonymize mon and mds + if daemon_type != 'osd': + anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) + daemon = anonymized_daemons[daemon] + result[daemon_type][daemon] = dump['mempool']['by_pool'] + elif mode == 'aggregated': + for mem_type in dump['mempool']['by_pool']: + result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes'] + result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items'] + else: + self.log.error("Incorrect mode specified in get_mempool: {}".format(mode)) + except (json.decoder.JSONDecodeError, KeyError) as e: + self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e)) + continue + + if anonymized_daemons: + # for debugging purposes only, this data is never reported + self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons)) + + return result + + def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]: + # Initialize result dict + result: Dict[str, dict] = defaultdict(lambda: defaultdict( + lambda: defaultdict( + lambda: defaultdict( + lambda: defaultdict( + lambda: defaultdict(int)))))) + + # Get list of osd ids from the metadata + osd_metadata = self.get('osd_metadata') + + # Grab output from the "osd.x perf histogram dump" command + for osd_id in osd_metadata: + cmd_dict = { + 'prefix': 'perf histogram dump', + 'id': str(osd_id), + 'format': 'json' + } + r, outb, outs = self.osd_command(cmd_dict) + # Check for invalid calls + if r != 0: + self.log.error("Invalid command dictionary: {}".format(cmd_dict)) + continue + else: + try: + # This is where the histograms will land if there are any. + dump = json.loads(outb) + + for histogram in dump['osd']: + # Log axis information. There are two axes, each represented + # as a dictionary. Both dictionaries are contained inside a + # list called 'axes'. + axes = [] + for axis in dump['osd'][histogram]['axes']: + + # This is the dict that contains information for an individual + # axis. It will be appended to the 'axes' list at the end. + axis_dict: Dict[str, Any] = defaultdict() + + # Collecting information for buckets, min, name, etc. + axis_dict['buckets'] = axis['buckets'] + axis_dict['min'] = axis['min'] + axis_dict['name'] = axis['name'] + axis_dict['quant_size'] = axis['quant_size'] + axis_dict['scale_type'] = axis['scale_type'] + + # Collecting ranges; placing them in lists to + # improve readability later on. + ranges = [] + for _range in axis['ranges']: + _max, _min = None, None + if 'max' in _range: + _max = _range['max'] + if 'min' in _range: + _min = _range['min'] + ranges.append([_min, _max]) + axis_dict['ranges'] = ranges + + # Now that 'axis_dict' contains all the appropriate + # information for the current axis, append it to the 'axes' list. + # There will end up being two axes in the 'axes' list, since the + # histograms are 2D. + axes.append(axis_dict) + + # Add the 'axes' list, containing both axes, to result. + # At this point, you will see that the name of the key is the string + # form of our axes list (str(axes)). This is there so that histograms + # with different axis configs will not be combined. + # These key names are later dropped when only the values are returned. + result[str(axes)][histogram]['axes'] = axes + + # Collect current values and make sure they are in + # integer form. + values = [] + for value_list in dump['osd'][histogram]['values']: + values.append([int(v) for v in value_list]) + + if mode == 'separated': + if 'osds' not in result[str(axes)][histogram]: + result[str(axes)][histogram]['osds'] = [] + result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values}) + + elif mode == 'aggregated': + # Aggregate values. If 'values' have already been initialized, + # we can safely add. + if 'values' in result[str(axes)][histogram]: + for i in range (0, len(values)): + for j in range (0, len(values[i])): + values[i][j] += result[str(axes)][histogram]['values'][i][j] + + # Add the values to result. + result[str(axes)][histogram]['values'] = values + + # Update num_combined_osds + if 'num_combined_osds' not in result[str(axes)][histogram]: + result[str(axes)][histogram]['num_combined_osds'] = 1 + else: + result[str(axes)][histogram]['num_combined_osds'] += 1 + else: + self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode)) + return list() + + # Sometimes, json errors occur if you give it an empty string. + # I am also putting in a catch for a KeyError since it could + # happen where the code is assuming that a key exists in the + # schema when it doesn't. In either case, we'll handle that + # by continuing and collecting what we can from other osds. + except (json.decoder.JSONDecodeError, KeyError) as e: + self.log.exception("Error caught on osd.{}: {}".format(osd_id, e)) + continue + + return list(result.values()) + + def get_io_rate(self) -> dict: + return self.get('io_rate') + + def get_stats_per_pool(self) -> dict: + result = self.get('pg_dump')['pool_stats'] + + # collect application metadata from osd_map + osd_map = self.get('osd_map') + application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']} + + # add application to each pool from pg_dump + for pool in result: + pool['application'] = [] + # Only include default applications + for application in application_metadata[pool['poolid']]: + if application in ['cephfs', 'mgr', 'rbd', 'rgw']: + pool['application'].append(application) + + return result + + def get_stats_per_pg(self) -> dict: + return self.get('pg_dump')['pg_stats'] + + def get_rocksdb_stats(self) -> Dict[str, str]: + # Initalizers + result: Dict[str, str] = defaultdict() + version = self.get_rocksdb_version() + + # Update result + result['version'] = version + + return result + + def gather_crashinfo(self) -> List[Dict[str, str]]: + crashlist: List[Dict[str, str]] = list() + errno, crashids, err = self.remote('crash', 'ls') + if errno: + return crashlist + for crashid in crashids.split(): + errno, crashinfo, err = self.remote('crash', 'do_info', crashid) + if errno: + continue + c = json.loads(crashinfo) + + # redact hostname + del c['utsname_hostname'] + + # entity_name might have more than one '.', beware + (etype, eid) = c.get('entity_name', '').split('.', 1) + m = hashlib.sha1() + assert self.salt + m.update(self.salt.encode('utf-8')) + m.update(eid.encode('utf-8')) + m.update(self.salt.encode('utf-8')) + c['entity_name'] = etype + '.' + m.hexdigest() + + # redact final line of python tracebacks, as the exception + # payload may contain identifying information + if 'mgr_module' in c and 'backtrace' in c: + # backtrace might be empty + if len(c['backtrace']) > 0: + c['backtrace'][-1] = '' + + crashlist.append(c) + return crashlist + + def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]: + # Extract perf counter data with get_unlabeled_perf_counters(), a method + # from mgr/mgr_module.py. This method returns a nested dictionary that + # looks a lot like perf schema, except with some additional fields. + # + # Example of output, a snapshot of a mon daemon: + # "mon.b": { + # "bluestore.kv_flush_lat": { + # "count": 2431, + # "description": "Average kv_thread flush latency", + # "nick": "fl_l", + # "priority": 8, + # "type": 5, + # "units": 1, + # "value": 88814109 + # }, + # }, + perf_counters = self.get_unlabeled_perf_counters() + + # Initialize 'result' dict + result: Dict[str, dict] = defaultdict(lambda: defaultdict( + lambda: defaultdict(lambda: defaultdict(int)))) + + # 'separated' mode + anonymized_daemon_dict = {} + + for daemon, perf_counters_by_daemon in perf_counters.items(): + daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw' + + if mode == 'separated': + # anonymize individual daemon names except osds + if (daemon_type != 'osd'): + anonymized_daemon = self.anonymize_entity_name(daemon) + anonymized_daemon_dict[anonymized_daemon] = daemon + daemon = anonymized_daemon + + # Calculate num combined daemon types if in aggregated mode + if mode == 'aggregated': + if 'num_combined_daemons' not in result[daemon_type]: + result[daemon_type]['num_combined_daemons'] = 1 + else: + result[daemon_type]['num_combined_daemons'] += 1 + + for collection in perf_counters_by_daemon: + # Split the collection to avoid redundancy in final report; i.e.: + # bluestore.kv_flush_lat, bluestore.kv_final_lat --> + # bluestore: kv_flush_lat, kv_final_lat + col_0, col_1 = collection.split('.') + + # Debug log for empty keys. This initially was a problem for prioritycache + # perf counters, where the col_0 was empty for certain mon counters: + # + # "mon.a": { instead of "mon.a": { + # "": { "prioritycache": { + # "cache_bytes": {...}, "cache_bytes": {...}, + # + # This log is here to detect any future instances of a similar issue. + if (daemon == "") or (col_0 == "") or (col_1 == ""): + self.log.debug("Instance of an empty key: {}{}".format(daemon, collection)) + + if mode == 'separated': + # Add value to result + result[daemon][col_0][col_1]['value'] = \ + perf_counters_by_daemon[collection]['value'] + + # Check that 'count' exists, as not all counters have a count field. + if 'count' in perf_counters_by_daemon[collection]: + result[daemon][col_0][col_1]['count'] = \ + perf_counters_by_daemon[collection]['count'] + elif mode == 'aggregated': + # Not every rgw daemon has the same schema. Specifically, each rgw daemon + # has a uniquely-named collection that starts off identically (i.e. + # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw"). + # This bit of code combines these unique counters all under one rgw instance. + # Without this check, the schema would remain separeted out in the final report. + if col_0[0:11] == "objecter-0x": + col_0 = "objecter-0x" + + # Check that the value can be incremented. In some cases, + # the files are of type 'pair' (real-integer-pair, integer-integer pair). + # In those cases, the value is a dictionary, and not a number. + # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"] + if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number): + result[daemon_type][col_0][col_1]['value'] += \ + perf_counters_by_daemon[collection]['value'] + + # Check that 'count' exists, as not all counters have a count field. + if 'count' in perf_counters_by_daemon[collection]: + result[daemon_type][col_0][col_1]['count'] += \ + perf_counters_by_daemon[collection]['count'] + else: + self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode)) + return {} + + if mode == 'separated': + # for debugging purposes only, this data is never reported + self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict)) + + return result + + def get_active_channels(self) -> List[str]: + r = [] + if self.channel_basic: + r.append('basic') + if self.channel_crash: + r.append('crash') + if self.channel_device: + r.append('device') + if self.channel_ident: + r.append('ident') + if self.channel_perf: + r.append('perf') + return r + + def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]: + try: + time_format = self.remote('devicehealth', 'get_time_format') + except Exception as e: + self.log.debug('Unable to format time: {}'.format(e)) + return {} + cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2) + min_sample = cutoff.strftime(time_format) + + devices = self.get('devices')['devices'] + if not devices: + self.log.debug('Unable to get device info from the mgr.') + return {} + + # anon-host-id -> anon-devid -> { timestamp -> record } + res: Dict[str, Dict[str, Dict[str, str]]] = {} + for d in devices: + devid = d['devid'] + try: + # this is a map of stamp -> {device info} + m = self.remote('devicehealth', 'get_recent_device_metrics', + devid, min_sample) + except Exception as e: + self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e)) + continue + + # anonymize host id + try: + host = d['location'][0]['host'] + except (KeyError, IndexError) as e: + self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e)) + continue + anon_host = self.get_store('host-id/%s' % host) + if not anon_host: + anon_host = str(uuid.uuid1()) + self.set_store('host-id/%s' % host, anon_host) + serial = None + for dev, rep in m.items(): + rep['host_id'] = anon_host + if serial is None and 'serial_number' in rep: + serial = rep['serial_number'] + + # anonymize device id + anon_devid = self.get_store('devid-id/%s' % devid) + if not anon_devid: + # ideally devid is 'vendor_model_serial', + # but can also be 'model_serial', 'serial' + if '_' in devid: + anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}" + else: + anon_devid = str(uuid.uuid1()) + self.set_store('devid-id/%s' % devid, anon_devid) + self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid, + host, anon_host)) + + # anonymize the smartctl report itself + if serial: + m_str = json.dumps(m) + m = json.loads(m_str.replace(serial, 'deleted')) + + if anon_host not in res: + res[anon_host] = {} + res[anon_host][anon_devid] = m + return res + + def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: + data = self.get_counter(daemon_type, daemon_name, stat)[stat] + if data: + return data[-1][1] + else: + return 0 + + def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]: + if not channels: + channels = self.get_active_channels() + report = { + 'leaderboard': self.leaderboard, + 'leaderboard_description': self.leaderboard_description, + 'report_version': 1, + 'report_timestamp': datetime.utcnow().isoformat(), + 'report_id': self.report_id, + 'channels': channels, + 'channels_available': ALL_CHANNELS, + 'license': LICENSE, + 'collections_available': [c['name'].name for c in MODULE_COLLECTION], + 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])], + } + + if 'ident' in channels: + for option in ['description', 'contact', 'organization']: + report[option] = getattr(self, option) + + if 'basic' in channels: + mon_map = self.get('mon_map') + osd_map = self.get('osd_map') + service_map = self.get('service_map') + fs_map = self.get('fs_map') + df = self.get('df') + df_pools = {pool['id']: pool for pool in df['pools']} + + report['created'] = mon_map['created'] + + # mons + v1_mons = 0 + v2_mons = 0 + ipv4_mons = 0 + ipv6_mons = 0 + for mon in mon_map['mons']: + for a in mon['public_addrs']['addrvec']: + if a['type'] == 'v2': + v2_mons += 1 + elif a['type'] == 'v1': + v1_mons += 1 + if a['addr'].startswith('['): + ipv6_mons += 1 + else: + ipv4_mons += 1 + report['mon'] = { + 'count': len(mon_map['mons']), + 'features': mon_map['features'], + 'min_mon_release': mon_map['min_mon_release'], + 'v1_addr_mons': v1_mons, + 'v2_addr_mons': v2_mons, + 'ipv4_addr_mons': ipv4_mons, + 'ipv6_addr_mons': ipv6_mons, + } + + report['config'] = self.gather_configs() + + # pools + + rbd_num_pools = 0 + rbd_num_images_by_pool = [] + rbd_mirroring_by_pool = [] + num_pg = 0 + report['pools'] = list() + for pool in osd_map['pools']: + num_pg += pool['pg_num'] + ec_profile = {} + if pool['erasure_code_profile']: + orig = osd_map['erasure_code_profiles'].get( + pool['erasure_code_profile'], {}) + ec_profile = { + k: orig[k] for k in orig.keys() + if k in ['k', 'm', 'plugin', 'technique', + 'crush-failure-domain', 'l'] + } + pool_data = { + 'pool': pool['pool'], + 'pg_num': pool['pg_num'], + 'pgp_num': pool['pg_placement_num'], + 'size': pool['size'], + 'min_size': pool['min_size'], + 'pg_autoscale_mode': pool['pg_autoscale_mode'], + 'target_max_bytes': pool['target_max_bytes'], + 'target_max_objects': pool['target_max_objects'], + 'type': ['', 'replicated', '', 'erasure'][pool['type']], + 'erasure_code_profile': ec_profile, + 'cache_mode': pool['cache_mode'], + } + + # basic_pool_usage collection + if self.is_enabled_collection(Collection.basic_pool_usage): + pool_data['application'] = [] + for application in pool['application_metadata']: + # Only include default applications + if application in ['cephfs', 'mgr', 'rbd', 'rgw']: + pool_data['application'].append(application) + pool_stats = df_pools[pool['pool']]['stats'] + pool_data['stats'] = { # filter out kb_used + 'avail_raw': pool_stats['avail_raw'], + 'bytes_used': pool_stats['bytes_used'], + 'compress_bytes_used': pool_stats['compress_bytes_used'], + 'compress_under_bytes': pool_stats['compress_under_bytes'], + 'data_bytes_used': pool_stats['data_bytes_used'], + 'dirty': pool_stats['dirty'], + 'max_avail': pool_stats['max_avail'], + 'objects': pool_stats['objects'], + 'omap_bytes_used': pool_stats['omap_bytes_used'], + 'percent_used': pool_stats['percent_used'], + 'quota_bytes': pool_stats['quota_bytes'], + 'quota_objects': pool_stats['quota_objects'], + 'rd': pool_stats['rd'], + 'rd_bytes': pool_stats['rd_bytes'], + 'stored': pool_stats['stored'], + 'stored_data': pool_stats['stored_data'], + 'stored_omap': pool_stats['stored_omap'], + 'stored_raw': pool_stats['stored_raw'], + 'wr': pool_stats['wr'], + 'wr_bytes': pool_stats['wr_bytes'] + } + pool_data['options'] = {} + # basic_pool_options_bluestore collection + if self.is_enabled_collection(Collection.basic_pool_options_bluestore): + bluestore_options = ['compression_algorithm', + 'compression_mode', + 'compression_required_ratio', + 'compression_min_blob_size', + 'compression_max_blob_size'] + for option in bluestore_options: + if option in pool['options']: + pool_data['options'][option] = pool['options'][option] + cast(List[Dict[str, Any]], report['pools']).append(pool_data) + if 'rbd' in pool['application_metadata']: + rbd_num_pools += 1 + ioctx = self.rados.open_ioctx(pool['pool_name']) + rbd_num_images_by_pool.append( + sum(1 for _ in rbd.RBD().list2(ioctx))) + rbd_mirroring_by_pool.append( + rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED) + report['rbd'] = { + 'num_pools': rbd_num_pools, + 'num_images_by_pool': rbd_num_images_by_pool, + 'mirroring_by_pool': rbd_mirroring_by_pool} + + # osds + cluster_network = False + for osd in osd_map['osds']: + if osd['up'] and not cluster_network: + front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0] + back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0] + if front_ip != back_ip: + cluster_network = True + report['osd'] = { + 'count': len(osd_map['osds']), + 'require_osd_release': osd_map['require_osd_release'], + 'require_min_compat_client': osd_map['require_min_compat_client'], + 'cluster_network': cluster_network, + } + + # crush + report['crush'] = self.gather_crush_info() + + # cephfs + report['fs'] = { + 'count': len(fs_map['filesystems']), + 'feature_flags': fs_map['feature_flags'], + 'num_standby_mds': len(fs_map['standbys']), + 'filesystems': [], + } + num_mds = len(fs_map['standbys']) + for fsm in fs_map['filesystems']: + fs = fsm['mdsmap'] + num_sessions = 0 + cached_ino = 0 + cached_dn = 0 + cached_cap = 0 + subtrees = 0 + rfiles = 0 + rbytes = 0 + rsnaps = 0 + for gid, mds in fs['info'].items(): + num_sessions += self.get_latest('mds', mds['name'], + 'mds_sessions.session_count') + cached_ino += self.get_latest('mds', mds['name'], + 'mds_mem.ino') + cached_dn += self.get_latest('mds', mds['name'], + 'mds_mem.dn') + cached_cap += self.get_latest('mds', mds['name'], + 'mds_mem.cap') + subtrees += self.get_latest('mds', mds['name'], + 'mds.subtrees') + if mds['rank'] == 0: + rfiles = self.get_latest('mds', mds['name'], + 'mds.root_rfiles') + rbytes = self.get_latest('mds', mds['name'], + 'mds.root_rbytes') + rsnaps = self.get_latest('mds', mds['name'], + 'mds.root_rsnaps') + report['fs']['filesystems'].append({ # type: ignore + 'max_mds': fs['max_mds'], + 'ever_allowed_features': fs['ever_allowed_features'], + 'explicitly_allowed_features': fs['explicitly_allowed_features'], + 'num_in': len(fs['in']), + 'num_up': len(fs['up']), + 'num_standby_replay': len( + [mds for gid, mds in fs['info'].items() + if mds['state'] == 'up:standby-replay']), + 'num_mds': len(fs['info']), + 'num_sessions': num_sessions, + 'cached_inos': cached_ino, + 'cached_dns': cached_dn, + 'cached_caps': cached_cap, + 'cached_subtrees': subtrees, + 'balancer_enabled': len(fs['balancer']) > 0, + 'num_data_pools': len(fs['data_pools']), + 'standby_count_wanted': fs['standby_count_wanted'], + 'approx_ctime': fs['created'][0:7], + 'files': rfiles, + 'bytes': rbytes, + 'snaps': rsnaps, + }) + num_mds += len(fs['info']) + report['fs']['total_num_mds'] = num_mds # type: ignore + + # daemons + report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map), + mon=self.gather_mon_metadata(mon_map)) + + if self.is_enabled_collection(Collection.basic_mds_metadata): + report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore + + # host counts + servers = self.list_servers() + self.log.debug('servers %s' % servers) + hosts = { + 'num': len([h for h in servers if h['hostname']]), + } + for t in ['mon', 'mds', 'osd', 'mgr']: + nr_services = sum(1 for host in servers if + any(service for service in cast(List[ServiceInfoT], + host['services']) + if service['type'] == t)) + hosts['num_with_' + t] = nr_services + report['hosts'] = hosts + + report['usage'] = { + 'pools': len(df['pools']), + 'pg_num': num_pg, + 'total_used_bytes': df['stats']['total_used_bytes'], + 'total_bytes': df['stats']['total_bytes'], + 'total_avail_bytes': df['stats']['total_avail_bytes'] + } + # basic_usage_by_class collection + if self.is_enabled_collection(Collection.basic_usage_by_class): + report['usage']['stats_by_class'] = {} # type: ignore + for device_class in df['stats_by_class']: + if device_class in ['hdd', 'ssd', 'nvme']: + report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore + + services: DefaultDict[str, int] = defaultdict(int) + for key, value in service_map['services'].items(): + services[key] += 1 + if key == 'rgw': + rgw = {} + zones = set() + zonegroups = set() + frontends = set() + count = 0 + d = value.get('daemons', dict()) + for k, v in d.items(): + if k == 'summary' and v: + rgw[k] = v + elif isinstance(v, dict) and 'metadata' in v: + count += 1 + zones.add(v['metadata']['zone_id']) + zonegroups.add(v['metadata']['zonegroup_id']) + frontends.add(v['metadata']['frontend_type#0']) + + # we could actually iterate over all the keys of + # the dict and check for how many frontends there + # are, but it is unlikely that one would be running + # more than 2 supported ones + f2 = v['metadata'].get('frontend_type#1', None) + if f2: + frontends.add(f2) + + rgw['count'] = count + rgw['zones'] = len(zones) + rgw['zonegroups'] = len(zonegroups) + rgw['frontends'] = list(frontends) # sets aren't json-serializable + report['rgw'] = rgw + report['services'] = services + + try: + report['balancer'] = self.remote('balancer', 'gather_telemetry') + except ImportError: + report['balancer'] = { + 'active': False + } + + # Rook + self.get_rook_data(report) + + if 'crash' in channels: + report['crashes'] = self.gather_crashinfo() + + if 'perf' in channels: + if self.is_enabled_collection(Collection.perf_perf): + report['perf_counters'] = self.gather_perf_counters('separated') + report['stats_per_pool'] = self.get_stats_per_pool() + report['stats_per_pg'] = self.get_stats_per_pg() + report['io_rate'] = self.get_io_rate() + report['osd_perf_histograms'] = self.get_osd_histograms('separated') + report['mempool'] = self.get_mempool('separated') + report['heap_stats'] = self.get_heap_stats() + report['rocksdb_stats'] = self.get_rocksdb_stats() + + # NOTE: We do not include the 'device' channel in this report; it is + # sent to a different endpoint. + + return report + + def get_rook_data(self, report: Dict[str, object]) -> None: + r, outb, outs = self.mon_command({ + 'prefix': 'config-key dump', + 'format': 'json' + }) + if r != 0: + return + try: + config_kv_dump = json.loads(outb) + except json.decoder.JSONDecodeError: + return + + for elem in ROOK_KEYS_BY_COLLECTION: + # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin") + # elem[1] is the Collection this key belongs to + if self.is_enabled_collection(elem[1]): + self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0])) + + def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None: + last_node = key_path.split('/')[-1] + for node in key_path.split('/')[0:-1]: + if node not in report: + report[node] = {} + report = report[node] # type: ignore + + # sanity check of keys correctness + if not isinstance(report, dict): + self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}") + return + + if last_node in report: + self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point") + return + + report[last_node] = value + + def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]: + self.log.info('Sending %s to: %s' % (what, url)) + proxies = dict() + if self.proxy: + self.log.info('Send using HTTP(S) proxy: %s', self.proxy) + proxies['http'] = self.proxy + proxies['https'] = self.proxy + try: + resp = requests.put(url=url, json=report, proxies=proxies) + resp.raise_for_status() + except Exception as e: + fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e)) + self.log.error(fail_reason) + return fail_reason + return None + + class EndPoint(enum.Enum): + ceph = 'ceph' + device = 'device' + + def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]: + ''' + Find collections that are available in the module, but are not in the db + ''' + if self.db_collection is None: + return None + + if not channels: + channels = ALL_CHANNELS + else: + for ch in channels: + if ch not in ALL_CHANNELS: + self.log.debug(f"invalid channel name: {ch}") + return None + + new_collection : List[Collection] = [] + + for c in MODULE_COLLECTION: + if c['name'].name not in self.db_collection: + if c['channel'] in channels: + new_collection.append(c['name']) + + return new_collection + + def is_major_upgrade(self) -> bool: + ''' + Returns True only if the user last opted-in to an older major + ''' + if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0: + # we do not know what Ceph version was when the user last opted-in, + # thus we do not wish to nag in case of a major upgrade + return False + + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + + if mon_min - self.last_opted_in_ceph_version > 0: + self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}") + return True + + return False + + def is_opted_in(self) -> bool: + # If len is 0 it means that the user is either opted-out (never + # opted-in, or invoked `telemetry off`), or they upgraded from a + # telemetry revision 1 or 2, which required to re-opt in to revision 3, + # regardless, hence is considered as opted-out + if self.db_collection is None: + return False + return len(self.db_collection) > 0 + + def should_nag(self) -> bool: + # Find delta between opted-in collections and module collections; + # nag only if module has a collection which is not in db, and nag == True. + + # We currently do not nag if the user is opted-out (or never opted-in). + # If we wish to do this in the future, we need to have a tri-mode state + # (opted in, opted out, no action yet), and it needs to be guarded by a + # config optionĀ (so that nagging can be turned off via config). + # We also need to add a last_opted_out_ceph_version variable, for the + # major upgrade check. + + # check if there are collections the user is not opt-in to + # that we should nag about + if self.db_collection is not None: + for c in MODULE_COLLECTION: + if c['name'].name not in self.db_collection: + if c['nag'] == True: + self.log.debug(f"The collection: {c['name']} is not reported") + return True + + # user might be opted-in to the most recent collection, or there is no + # new collection which requires nagging about; thus nag in case it's a + # major upgrade and there are new collections + # (which their own nag == False): + new_collections = False + col_delta = self.collection_delta() + if col_delta is not None and len(col_delta) > 0: + new_collections = True + + return self.is_major_upgrade() and new_collections + + def init_collection(self) -> None: + # We fetch from db the collections the user had already opted-in to. + # During the transition the results will be empty, but the user might + # be opted-in to an older version (e.g. revision = 3) + + collection = self.get_store('collection') + + if collection is not None: + self.db_collection = json.loads(collection) + + if self.db_collection is None: + # happens once on upgrade + if not self.enabled: + # user is not opted-in + self.set_store('collection', json.dumps([])) + self.log.debug("user is not opted-in") + else: + # user is opted-in, verify the revision: + if self.last_opt_revision == REVISION: + self.log.debug(f"telemetry revision is {REVISION}") + base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name] + self.set_store('collection', json.dumps(base_collection)) + else: + # user is opted-in to an older version, meaning they need + # to re-opt in regardless + self.set_store('collection', json.dumps([])) + self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in") + + # reload collection after setting + collection = self.get_store('collection') + if collection is not None: + self.db_collection = json.loads(collection) + else: + raise RuntimeError('collection is None after initial setting') + else: + # user has already upgraded + self.log.debug(f"user has upgraded already: collection: {self.db_collection}") + + def is_enabled_collection(self, collection: Collection) -> bool: + if self.db_collection is None: + return False + return collection.name in self.db_collection + + def opt_in_all_collections(self) -> None: + """ + Opt-in to all collections; Update db with the currently available collections in the module + """ + if self.db_collection is None: + raise RuntimeError('db_collection is None after initial setting') + + for c in MODULE_COLLECTION: + if c['name'].name not in self.db_collection: + self.db_collection.append(c['name']) + + self.set_store('collection', json.dumps(self.db_collection)) + + def send(self, + report: Dict[str, Dict[str, str]], + endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]: + if not endpoint: + endpoint = [self.EndPoint.ceph, self.EndPoint.device] + failed = [] + success = [] + self.log.debug('Send endpoints %s' % endpoint) + for e in endpoint: + if e == self.EndPoint.ceph: + fail_reason = self._try_post('ceph report', self.url, report) + if fail_reason: + failed.append(fail_reason) + else: + now = int(time.time()) + self.last_upload = now + self.set_store('last_upload', str(now)) + success.append('Ceph report sent to {0}'.format(self.url)) + self.log.info('Sent report to {0}'.format(self.url)) + elif e == self.EndPoint.device: + if 'device' in self.get_active_channels(): + devices = self.gather_device_report() + if devices: + num_devs = 0 + num_hosts = 0 + for host, ls in devices.items(): + self.log.debug('host %s devices %s' % (host, ls)) + if not len(ls): + continue + fail_reason = self._try_post('devices', self.device_url, + ls) + if fail_reason: + failed.append(fail_reason) + else: + num_devs += len(ls) + num_hosts += 1 + if num_devs: + success.append('Reported %d devices from %d hosts across a total of %d hosts' % ( + num_devs, num_hosts, len(devices))) + else: + fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.' + failed.append(fail_reason) + self.log.error(fail_reason) + if failed: + return 1, '', '\n'.join(success + failed) + return 0, '', '\n'.join(success) + + def format_perf_histogram(self, report: Dict[str, Any]) -> None: + # Formatting the perf histograms so they are human-readable. This will change the + # ranges and values, which are currently in list form, into strings so that + # they are displayed horizontally instead of vertically. + if 'report' in report: + report = report['report'] + try: + # Formatting ranges and values in osd_perf_histograms + mode = 'osd_perf_histograms' + for config in report[mode]: + for histogram in config: + # Adjust ranges by converting lists into strings + for axis in config[histogram]['axes']: + for i in range(0, len(axis['ranges'])): + axis['ranges'][i] = str(axis['ranges'][i]) + + for osd in config[histogram]['osds']: + for i in range(0, len(osd['values'])): + osd['values'][i] = str(osd['values'][i]) + except KeyError: + # If the perf channel is not enabled, there should be a KeyError since + # 'osd_perf_histograms' would not be present in the report. In that case, + # the show function should pass as usual without trying to format the + # histograms. + pass + + def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Enable or disable a list of channels + ''' + if not self.enabled: + # telemetry should be on for channels to be toggled + msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + + if channels is None: + msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.' + return 0, msg, '' + + state = action == 'enable' + msg = '' + for c in channels: + if c not in ALL_CHANNELS: + msg = f"{msg}{c} is not a valid channel name. "\ + f"Available channels: {ALL_CHANNELS}.\n" + else: + self.set_module_option(f"channel_{c}", state) + setattr(self, + f"channel_{c}", + state) + msg = f"{msg}channel_{c} is {action}d\n" + + return 0, msg, '' + + @CLIReadCommand('telemetry status') + def status(self) -> Tuple[int, str, str]: + ''' + Show current configuration + ''' + r = {} + for opt in self.MODULE_OPTIONS: + r[opt['name']] = getattr(self, opt['name']) + r['last_upload'] = (time.ctime(self.last_upload) + if self.last_upload else self.last_upload) + return 0, json.dumps(r, indent=4, sort_keys=True), '' + + @CLIReadCommand('telemetry diff') + def diff(self) -> Tuple[int, str, str]: + ''' + Show the diff between opted-in collection and available collection + ''' + diff = [] + keys = ['nag'] + + for c in MODULE_COLLECTION: + if not self.is_enabled_collection(c['name']): + diff.append({key: val for key, val in c.items() if key not in keys}) + + r = None + if diff == []: + r = "Telemetry is up to date" + else: + r = json.dumps(diff, indent=4, sort_keys=True) + + return 0, r, '' + + @CLICommand('telemetry on') + def on(self, license: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Enable telemetry reports from this cluster + ''' + if license != LICENSE: + return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). +To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.''' + else: + self.set_module_option('enabled', True) + self.enabled = True + self.opt_in_all_collections() + + # for major releases upgrade nagging + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + self.set_store('last_opted_in_ceph_version', str(mon_min)) + self.last_opted_in_ceph_version = mon_min + + msg = 'Telemetry is on.' + disabled_channels = '' + active_channels = self.get_active_channels() + for c in ALL_CHANNELS: + if c not in active_channels and c != 'ident': + disabled_channels = f"{disabled_channels} {c}" + + if len(disabled_channels) > 0: + msg = f"{msg}\nSome channels are disabled, please enable with:\n"\ + f"`ceph telemetry enable channel{disabled_channels}`" + + # wake up serve() to reset health warning + self.event.set() + + return 0, msg, '' + + @CLICommand('telemetry off') + def off(self) -> Tuple[int, str, str]: + ''' + Disable telemetry reports from this cluster + ''' + if not self.enabled: + # telemetry is already off + msg = 'Telemetry is currently not enabled, nothing to turn off. '\ + 'Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + + self.set_module_option('enabled', False) + self.enabled = False + self.set_store('collection', json.dumps([])) + self.db_collection = [] + + # we might need this info in the future, in case + # of nagging when user is opted-out + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + self.set_store('last_opted_out_ceph_version', str(mon_min)) + self.last_opted_out_ceph_version = mon_min + + msg = 'Telemetry is now disabled.' + return 0, msg, '' + + @CLIReadCommand('telemetry enable channel all') + def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: + ''' + Enable all channels + ''' + return self.toggle_channel('enable', channels) + + @CLIReadCommand('telemetry enable channel') + def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Enable a list of channels + ''' + return self.toggle_channel('enable', channels) + + @CLIReadCommand('telemetry disable channel all') + def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]: + ''' + Disable all channels + ''' + return self.toggle_channel('disable', channels) + + @CLIReadCommand('telemetry disable channel') + def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Disable a list of channels + ''' + return self.toggle_channel('disable', channels) + + @CLIReadCommand('telemetry channel ls') + def channel_ls(self) -> Tuple[int, str, str]: + ''' + List all channels + ''' + table = PrettyTable( + [ + 'NAME', 'ENABLED', 'DEFAULT', 'DESC', + ], + border=False) + table.align['NAME'] = 'l' + table.align['ENABLED'] = 'l' + table.align['DEFAULT'] = 'l' + table.align['DESC'] = 'l' + table.left_padding_width = 0 + table.right_padding_width = 4 + + for c in ALL_CHANNELS: + enabled = "ON" if getattr(self, f"channel_{c}") else "OFF" + for o in self.MODULE_OPTIONS: + if o['name'] == f"channel_{c}": + default = "ON" if o.get('default', None) else "OFF" + desc = o.get('desc', None) + + table.add_row(( + c, + enabled, + default, + desc, + )) + + return 0, table.get_string(sortby="NAME"), '' + + @CLIReadCommand('telemetry collection ls') + def collection_ls(self) -> Tuple[int, str, str]: + ''' + List all collections + ''' + col_delta = self.collection_delta() + msg = '' + if col_delta is not None and len(col_delta) > 0: + msg = f"New collections are available:\n" \ + f"{sorted([c.name for c in col_delta])}\n" \ + f"Run `ceph telemetry on` to opt-in to these collections.\n" + + table = PrettyTable( + [ + 'NAME', 'STATUS', 'DESC', + ], + border=False) + table.align['NAME'] = 'l' + table.align['STATUS'] = 'l' + table.align['DESC'] = 'l' + table.left_padding_width = 0 + table.right_padding_width = 4 + + for c in MODULE_COLLECTION: + name = c['name'] + opted_in = self.is_enabled_collection(name) + channel_enabled = getattr(self, f"channel_{c['channel']}") + + status = '' + if channel_enabled and opted_in: + status = "REPORTING" + else: + why = '' + delimiter = '' + + if not opted_in: + why += "NOT OPTED-IN" + delimiter = ', ' + if not channel_enabled: + why += f"{delimiter}CHANNEL {c['channel']} IS OFF" + + status = f"NOT REPORTING: {why}" + + desc = c['description'] + + table.add_row(( + name, + status, + desc, + )) + + if len(msg): + # add a new line between message and table output + msg = f"{msg} \n" + + return 0, f'{msg}{table.get_string(sortby="NAME")}', '' + + @CLICommand('telemetry send') + def do_send(self, + endpoint: Optional[List[EndPoint]] = None, + license: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Send a sample report + ''' + if not self.is_opted_in() and license != LICENSE: + self.log.debug(('A telemetry send attempt while opted-out. ' + 'Asking for license agreement')) + return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). +To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command. +Please consider enabling the telemetry module with 'ceph telemetry on'.''' + else: + self.last_report = self.compile_report() + return self.send(self.last_report, endpoint) + + @CLIReadCommand('telemetry show') + def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Show a sample report of opted-in collections (except for 'device') + ''' + if not self.enabled: + # if telemetry is off, no report is being sent, hence nothing to show + msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + + report = self.get_report_locked(channels=channels) + self.format_perf_histogram(report) + report = json.dumps(report, indent=4, sort_keys=True) + + if self.channel_device: + report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.''' + + return 0, report, '' + + @CLIReadCommand('telemetry preview') + def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Preview a sample report of the most recent collections available (except for 'device') + ''' + report = {} + + # We use a lock to prevent a scenario where the user wishes to preview + # the report, and at the same time the module hits the interval of + # sending a report with the opted-in collection, which has less data + # than in the preview report. + col_delta = self.collection_delta() + with self.get_report_lock: + if col_delta is not None and len(col_delta) == 0: + # user is already opted-in to the most recent collection + msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' + return 0, msg, '' + else: + # there are collections the user is not opted-in to + next_collection = [] + + for c in MODULE_COLLECTION: + next_collection.append(c['name'].name) + + opted_in_collection = self.db_collection + self.db_collection = next_collection + report = self.get_report(channels=channels) + self.db_collection = opted_in_collection + + self.format_perf_histogram(report) + report = json.dumps(report, indent=4, sort_keys=True) + + if self.channel_device: + report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.''' + + return 0, report, '' + + @CLIReadCommand('telemetry show-device') + def show_device(self) -> Tuple[int, str, str]: + ''' + Show a sample device report + ''' + if not self.enabled: + # if telemetry is off, no report is being sent, hence nothing to show + msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample device reports with `ceph telemetry preview-device`.' + return 0, msg, '' + + if not self.channel_device: + # if device channel is off, device report is not being sent, hence nothing to show + msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \ + 'Preview sample device reports with `ceph telemetry preview-device`.' + return 0, msg, '' + + return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), '' + + @CLIReadCommand('telemetry preview-device') + def preview_device(self) -> Tuple[int, str, str]: + ''' + Preview a sample device report of the most recent device collection + ''' + report = {} + + device_col_delta = self.collection_delta(['device']) + with self.get_report_lock: + if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device: + # user is already opted-in to the most recent device collection, + # and device channel is on, thus `show-device` should be called + msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.' + return 0, msg, '' + + # either the user is not opted-in at all, or there are collections + # they are not opted-in to + next_collection = [] + + for c in MODULE_COLLECTION: + next_collection.append(c['name'].name) + + opted_in_collection = self.db_collection + self.db_collection = next_collection + report = self.get_report('device') + self.db_collection = opted_in_collection + + report = json.dumps(report, indent=4, sort_keys=True) + return 0, report, '' + + @CLIReadCommand('telemetry show-all') + def show_all(self) -> Tuple[int, str, str]: + ''' + Show a sample report of all enabled channels (including 'device' channel) + ''' + if not self.enabled: + # if telemetry is off, no report is being sent, hence nothing to show + msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + + if not self.channel_device: + # device channel is off, no need to display its report + report = self.get_report_locked('default') + else: + # telemetry is on and device channel is enabled, show both + report = self.get_report_locked('all') + + self.format_perf_histogram(report) + return 0, json.dumps(report, indent=4, sort_keys=True), '' + + @CLIReadCommand('telemetry preview-all') + def preview_all(self) -> Tuple[int, str, str]: + ''' + Preview a sample report of the most recent collections available of all channels (including 'device') + ''' + report = {} + + col_delta = self.collection_delta() + with self.get_report_lock: + if col_delta is not None and len(col_delta) == 0: + # user is already opted-in to the most recent collection + msg = 'Telemetry is up to date, see report with `ceph telemetry show`.' + return 0, msg, '' + + # there are collections the user is not opted-in to + next_collection = [] + + for c in MODULE_COLLECTION: + next_collection.append(c['name'].name) + + opted_in_collection = self.db_collection + self.db_collection = next_collection + report = self.get_report('all') + self.db_collection = opted_in_collection + + self.format_perf_histogram(report) + report = json.dumps(report, indent=4, sort_keys=True) + + return 0, report, '' + + def get_report_locked(self, + report_type: str = 'default', + channels: Optional[List[str]] = None) -> Dict[str, Any]: + ''' + A wrapper around get_report to allow for compiling a report of the most recent module collections + ''' + with self.get_report_lock: + return self.get_report(report_type, channels) + + def get_report(self, + report_type: str = 'default', + channels: Optional[List[str]] = None) -> Dict[str, Any]: + if report_type == 'default': + return self.compile_report(channels=channels) + elif report_type == 'device': + return self.gather_device_report() + elif report_type == 'all': + return {'report': self.compile_report(channels=channels), + 'device_report': self.gather_device_report()} + return {} + + def self_test(self) -> None: + self.opt_in_all_collections() + report = self.compile_report(channels=ALL_CHANNELS) + if len(report) == 0: + raise RuntimeError('Report is empty') + + if 'report_id' not in report: + raise RuntimeError('report_id not found in report') + + def shutdown(self) -> None: + self.run = False + self.event.set() + + def refresh_health_checks(self) -> None: + health_checks = {} + # TODO do we want to nag also in case the user is not opted-in? + if self.enabled and self.should_nag(): + health_checks['TELEMETRY_CHANGED'] = { + 'severity': 'warning', + 'summary': 'Telemetry requires re-opt-in', + 'detail': [ + 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`' + ] + } + self.set_health_checks(health_checks) + + def serve(self) -> None: + self.load() + self.run = True + + self.log.debug('Waiting for mgr to warm up') + time.sleep(10) + + while self.run: + self.event.clear() + + self.refresh_health_checks() + + if not self.is_opted_in(): + self.log.debug('Not sending report until user re-opts-in') + self.event.wait(1800) + continue + if not self.enabled: + self.log.debug('Not sending report until configured to do so') + self.event.wait(1800) + continue + + now = int(time.time()) + if not self.last_upload or \ + (now - self.last_upload) > self.interval * 3600: + self.log.info('Compiling and sending report to %s', + self.url) + + try: + self.last_report = self.compile_report() + except Exception: + self.log.exception('Exception while compiling report:') + + self.send(self.last_report) + else: + self.log.debug('Interval for sending new report has not expired') + + sleep = 3600 + self.log.debug('Sleeping for %d seconds', sleep) + self.event.wait(sleep) + + @staticmethod + def can_run() -> Tuple[bool, str]: + return True, '' -- cgit v1.2.3