summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/telemetry/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/telemetry/module.py')
-rw-r--r--src/pybind/mgr/telemetry/module.py2074
1 files changed, 2074 insertions, 0 deletions
diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py
new file mode 100644
index 000000000..f729b9180
--- /dev/null
+++ b/src/pybind/mgr/telemetry/module.py
@@ -0,0 +1,2074 @@
+"""
+Telemetry module for ceph-mgr
+
+Collect statistics from Ceph cluster and send this back to the Ceph project
+when user has opted-in
+"""
+import logging
+import numbers
+import enum
+import errno
+import hashlib
+import json
+import rbd
+import requests
+import uuid
+import time
+from datetime import datetime, timedelta
+from prettytable import PrettyTable
+from threading import Event, Lock
+from collections import defaultdict
+from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
+
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, ServiceInfoT
+
+
+ALL_CHANNELS = ['basic', 'ident', 'crash', 'device', 'perf']
+
+LICENSE = 'sharing-1-0'
+LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
+LICENSE_URL = 'https://cdla.io/sharing-1-0/'
+NO_SALT_CNT = 0
+
+# Latest revision of the telemetry report. Bump this each time we make
+# *any* change.
+REVISION = 3
+
+# History of revisions
+# --------------------
+#
+# Version 1:
+# Mimic and/or nautilus are lumped together here, since
+# we didn't track revisions yet.
+#
+# Version 2:
+# - added revision tracking, nagging, etc.
+# - added config option changes
+# - added channels
+# - added explicit license acknowledgement to the opt-in process
+#
+# Version 3:
+# - added device health metrics (i.e., SMART data, minus serial number)
+# - remove crush_rule
+# - added CephFS metadata (how many MDSs, fs features, how many data pools,
+# how much metadata is cached, rfiles, rbytes, rsnapshots)
+# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
+# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
+# - whether an OSD cluster network is in use
+# - rbd pool and image count, and rbd mirror mode (pool-level)
+# - rgw daemons, zones, zonegroups; which rgw frontends
+# - crush map stats
+
+class Collection(str, enum.Enum):
+ basic_base = 'basic_base'
+ device_base = 'device_base'
+ crash_base = 'crash_base'
+ ident_base = 'ident_base'
+ perf_perf = 'perf_perf'
+ basic_mds_metadata = 'basic_mds_metadata'
+ basic_pool_usage = 'basic_pool_usage'
+ basic_usage_by_class = 'basic_usage_by_class'
+ basic_rook_v01 = 'basic_rook_v01'
+ perf_memory_metrics = 'perf_memory_metrics'
+ basic_pool_options_bluestore = 'basic_pool_options_bluestore'
+
+MODULE_COLLECTION : List[Dict] = [
+ {
+ "name": Collection.basic_base,
+ "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.device_base,
+ "description": "Information about device health metrics",
+ "channel": "device",
+ "nag": False
+ },
+ {
+ "name": Collection.crash_base,
+ "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
+ "channel": "crash",
+ "nag": False
+ },
+ {
+ "name": Collection.ident_base,
+ "description": "User-provided identifying information about the cluster",
+ "channel": "ident",
+ "nag": False
+ },
+ {
+ "name": Collection.perf_perf,
+ "description": "Information about performance counters of the cluster",
+ "channel": "perf",
+ "nag": True
+ },
+ {
+ "name": Collection.basic_mds_metadata,
+ "description": "MDS metadata",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_pool_usage,
+ "description": "Default pool application and usage statistics",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_usage_by_class,
+ "description": "Default device class usage statistics",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_rook_v01,
+ "description": "Basic Rook deployment data",
+ "channel": "basic",
+ "nag": True
+ },
+ {
+ "name": Collection.perf_memory_metrics,
+ "description": "Heap stats and mempools for mon and mds",
+ "channel": "perf",
+ "nag": False
+ },
+ {
+ "name": Collection.basic_pool_options_bluestore,
+ "description": "Per-pool bluestore config options",
+ "channel": "basic",
+ "nag": False
+ },
+]
+
+ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
+ # Note: a key cannot be both a node and a leaf, e.g.
+ # "rook/a/b"
+ # "rook/a/b/c"
+ ("rook/version", Collection.basic_rook_v01),
+ ("rook/kubernetes/version", Collection.basic_rook_v01),
+ ("rook/csi/version", Collection.basic_rook_v01),
+ ("rook/node/count/kubernetes-total", Collection.basic_rook_v01),
+ ("rook/node/count/with-ceph-daemons", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-rbd-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-cephfs-plugin", Collection.basic_rook_v01),
+ ("rook/node/count/with-csi-nfs-plugin", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/total", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/rbd", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/cephfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/nfs", Collection.basic_rook_v01),
+ ("rook/usage/storage-class/count/bucket", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/total", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/portable", Collection.basic_rook_v01),
+ ("rook/cluster/storage/device-set/count/non-portable", Collection.basic_rook_v01),
+ ("rook/cluster/mon/count", Collection.basic_rook_v01),
+ ("rook/cluster/mon/allow-multiple-per-node", Collection.basic_rook_v01),
+ ("rook/cluster/mon/max-id", Collection.basic_rook_v01),
+ ("rook/cluster/mon/pvc/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/mon/stretch/enabled", Collection.basic_rook_v01),
+ ("rook/cluster/network/provider", Collection.basic_rook_v01),
+ ("rook/cluster/external-mode", Collection.basic_rook_v01),
+]
+
+class Module(MgrModule):
+ metadata_keys = [
+ "arch",
+ "ceph_version",
+ "os",
+ "cpu",
+ "kernel_description",
+ "kernel_version",
+ "distro_description",
+ "distro"
+ ]
+
+ MODULE_OPTIONS = [
+ Option(name='url',
+ type='str',
+ default='https://telemetry.ceph.com/report'),
+ Option(name='device_url',
+ type='str',
+ default='https://telemetry.ceph.com/device'),
+ Option(name='enabled',
+ type='bool',
+ default=False),
+ Option(name='last_opt_revision',
+ type='int',
+ default=1),
+ Option(name='leaderboard',
+ type='bool',
+ default=False),
+ Option(name='leaderboard_description',
+ type='str',
+ default=None),
+ Option(name='description',
+ type='str',
+ default=None),
+ Option(name='contact',
+ type='str',
+ default=None),
+ Option(name='organization',
+ type='str',
+ default=None),
+ Option(name='proxy',
+ type='str',
+ default=None),
+ Option(name='interval',
+ type='int',
+ default=24,
+ min=8),
+ Option(name='channel_basic',
+ type='bool',
+ default=True,
+ desc='Share basic cluster information (size, version)'),
+ Option(name='channel_ident',
+ type='bool',
+ default=False,
+ desc='Share a user-provided description and/or contact email for the cluster'),
+ Option(name='channel_crash',
+ type='bool',
+ default=True,
+ desc='Share metadata about Ceph daemon crashes (version, stack straces, etc)'),
+ Option(name='channel_device',
+ type='bool',
+ default=True,
+ desc=('Share device health metrics '
+ '(e.g., SMART data, minus potentially identifying info like serial numbers)')),
+ Option(name='channel_perf',
+ type='bool',
+ default=False,
+ desc='Share various performance metrics of a cluster'),
+ ]
+
+ @property
+ def config_keys(self) -> Dict[str, OptionValue]:
+ return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = False
+ self.db_collection: Optional[List[str]] = None
+ self.last_opted_in_ceph_version: Optional[int] = None
+ self.last_opted_out_ceph_version: Optional[int] = None
+ self.last_upload: Optional[int] = None
+ self.last_report: Dict[str, Any] = dict()
+ self.report_id: Optional[str] = None
+ self.salt: Optional[str] = None
+ self.get_report_lock = Lock()
+ self.config_update_module_option()
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.url = ''
+ self.device_url = ''
+ self.enabled = False
+ self.last_opt_revision = 0
+ self.leaderboard = ''
+ self.leaderboard_description = ''
+ self.interval = 0
+ self.proxy = ''
+ self.channel_basic = True
+ self.channel_ident = False
+ self.channel_crash = True
+ self.channel_device = True
+ self.channel_perf = False
+ self.db_collection = ['basic_base', 'device_base']
+ self.last_opted_in_ceph_version = 17
+ self.last_opted_out_ceph_version = 0
+
+ def config_update_module_option(self) -> None:
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def config_notify(self) -> None:
+ self.config_update_module_option()
+ # wake up serve() thread
+ self.event.set()
+
+ def load(self) -> None:
+ last_upload = self.get_store('last_upload', None)
+ if last_upload is None:
+ self.last_upload = None
+ else:
+ self.last_upload = int(last_upload)
+
+ report_id = self.get_store('report_id', None)
+ if report_id is None:
+ self.report_id = str(uuid.uuid4())
+ self.set_store('report_id', self.report_id)
+ else:
+ self.report_id = report_id
+
+ salt = self.get_store('salt', None)
+ if salt is None:
+ self.salt = str(uuid.uuid4())
+ self.set_store('salt', self.salt)
+ else:
+ self.salt = salt
+
+ self.init_collection()
+
+ last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
+ if last_opted_in_ceph_version is None:
+ self.last_opted_in_ceph_version = None
+ else:
+ self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
+
+ last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
+ if last_opted_out_ceph_version is None:
+ self.last_opted_out_ceph_version = None
+ else:
+ self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
+
+ def gather_osd_metadata(self,
+ osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
+ keys = ["osd_objectstore", "rotational"]
+ keys += self.metadata_keys
+
+ metadata: Dict[str, Dict[str, int]] = dict()
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for osd in osd_map['osds']:
+ res = self.get_metadata('osd', str(osd['osd']))
+ if res is None:
+ self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
+ continue
+ for k, v in res.items():
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_mon_metadata(self,
+ mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
+ keys = list()
+ keys += self.metadata_keys
+
+ metadata: Dict[str, Dict[str, int]] = dict()
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for mon in mon_map['mons']:
+ res = self.get_metadata('mon', mon['name'])
+ if res is None:
+ self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
+ continue
+ for k, v in res.items():
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
+ metadata: Dict[str, Dict[str, int]] = dict()
+
+ res = self.get('mds_metadata') # metadata of *all* mds daemons
+ if res is None or not res:
+ self.log.debug('Could not get metadata for mds daemons')
+ return metadata
+
+ keys = list()
+ keys += self.metadata_keys
+
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for mds in res.values():
+ for k, v in mds.items():
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_crush_info(self) -> Dict[str, Union[int,
+ bool,
+ List[int],
+ Dict[str, int],
+ Dict[int, int]]]:
+ osdmap = self.get_osdmap()
+ crush_raw = osdmap.get_crush()
+ crush = crush_raw.dump()
+
+ BucketKeyT = TypeVar('BucketKeyT', int, str)
+
+ def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
+ if k in d:
+ d[k] += 1
+ else:
+ d[k] = 1
+
+ device_classes: Dict[str, int] = {}
+ for dev in crush['devices']:
+ inc(device_classes, dev.get('class', ''))
+
+ bucket_algs: Dict[str, int] = {}
+ bucket_types: Dict[str, int] = {}
+ bucket_sizes: Dict[int, int] = {}
+ for bucket in crush['buckets']:
+ if '~' in bucket['name']: # ignore shadow buckets
+ continue
+ inc(bucket_algs, bucket['alg'])
+ inc(bucket_types, bucket['type_id'])
+ inc(bucket_sizes, len(bucket['items']))
+
+ return {
+ 'num_devices': len(crush['devices']),
+ 'num_types': len(crush['types']),
+ 'num_buckets': len(crush['buckets']),
+ 'num_rules': len(crush['rules']),
+ 'device_classes': list(device_classes.values()),
+ 'tunables': crush['tunables'],
+ 'compat_weight_set': '-1' in crush['choose_args'],
+ 'num_weight_sets': len(crush['choose_args']),
+ 'bucket_algs': bucket_algs,
+ 'bucket_sizes': bucket_sizes,
+ 'bucket_types': bucket_types,
+ }
+
+ def gather_configs(self) -> Dict[str, List[str]]:
+ # cluster config options
+ cluster = set()
+ r, outb, outs = self.mon_command({
+ 'prefix': 'config dump',
+ 'format': 'json'
+ })
+ if r != 0:
+ return {}
+ try:
+ dump = json.loads(outb)
+ except json.decoder.JSONDecodeError:
+ return {}
+ for opt in dump:
+ name = opt.get('name')
+ if name:
+ cluster.add(name)
+ # daemon-reported options (which may include ceph.conf)
+ active = set()
+ ls = self.get("modified_config_options")
+ for opt in ls.get('options', {}):
+ active.add(opt)
+ return {
+ 'cluster_changed': sorted(list(cluster)),
+ 'active_changed': sorted(list(active)),
+ }
+
+ def anonymize_entity_name(self, entity_name:str) -> str:
+ if '.' not in entity_name:
+ self.log.debug(f"Cannot split entity name ({entity_name}), no '.' is found")
+ return entity_name
+
+ (etype, eid) = entity_name.split('.', 1)
+ m = hashlib.sha1()
+ salt = ''
+ if self.salt is not None:
+ salt = self.salt
+ # avoid asserting that salt exists
+ if not self.salt:
+ # do not set self.salt to a temp value
+ salt = f"no_salt_found_{NO_SALT_CNT}"
+ NO_SALT_CNT += 1
+ self.log.debug(f"No salt found, created a temp one: {salt}")
+ m.update(salt.encode('utf-8'))
+ m.update(eid.encode('utf-8'))
+ m.update(salt.encode('utf-8'))
+
+ return etype + '.' + m.hexdigest()
+
+ def get_heap_stats(self) -> Dict[str, dict]:
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
+
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (1/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "daemon.x heap stats" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
+ heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+ if heap_stats:
+ if (daemon_type != 'osd'):
+ # Anonymize mon and mds
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = heap_stats
+ else:
+ continue
+
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
+ return result
+
+ def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+ parsed_output = {}
+
+ cmd_dict = {
+ 'prefix': 'heap',
+ 'heapcmd': 'stats'
+ }
+ r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ else:
+ if 'tcmalloc heap stats' in outb:
+ values = [int(i) for i in outb.split() if i.isdigit()]
+ # `categories` must be ordered this way for the correct output to be parsed
+ categories = ['use_by_application',
+ 'page_heap_freelist',
+ 'central_cache_freelist',
+ 'transfer_cache_freelist',
+ 'thread_cache_freelists',
+ 'malloc_metadata',
+ 'actual_memory_used',
+ 'released_to_os',
+ 'virtual_address_space_used',
+ 'spans_in_use',
+ 'thread_heaps_in_use',
+ 'tcmalloc_page_size']
+ if len(values) != len(categories):
+ self.log.error('Received unexpected output from {}.{}; ' \
+ 'number of values should match the number' \
+ 'of expected categories:\n values: len={} {} '\
+ '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+ else:
+ parsed_output = dict(zip(categories, values))
+ else:
+ self.log.error('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+
+ return parsed_output
+
+ def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+ anonymized_daemons = {}
+ osd_map = self.get('osd_map')
+
+ # Combine available daemons
+ daemons = []
+ for osd in osd_map['osds']:
+ daemons.append('osd'+'.'+str(osd['osd']))
+ # perf_memory_metrics collection (2/2)
+ if self.is_enabled_collection(Collection.perf_memory_metrics):
+ mon_map = self.get('mon_map')
+ mds_metadata = self.get('mds_metadata')
+ for mon in mon_map['mons']:
+ daemons.append('mon'+'.'+mon['name'])
+ for mds in mds_metadata:
+ daemons.append('mds'+'.'+mds)
+
+ # Grab output from the "dump_mempools" command
+ for daemon in daemons:
+ daemon_type, daemon_id = daemon.split('.', 1)
+ cmd_dict = {
+ 'prefix': 'dump_mempools',
+ 'format': 'json'
+ }
+ r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ continue
+ else:
+ try:
+ # This is where the mempool will land.
+ dump = json.loads(outb)
+ if mode == 'separated':
+ # Anonymize mon and mds
+ if daemon_type != 'osd':
+ anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+ daemon = anonymized_daemons[daemon]
+ result[daemon_type][daemon] = dump['mempool']['by_pool']
+ elif mode == 'aggregated':
+ for mem_type in dump['mempool']['by_pool']:
+ result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+ result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+ else:
+ self.log.error("Incorrect mode specified in get_mempool: {}".format(mode))
+ except (json.decoder.JSONDecodeError, KeyError) as e:
+ self.log.exception("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
+ continue
+
+ if anonymized_daemons:
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
+ return result
+
+ def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]:
+ # Initialize result dict
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(
+ lambda: defaultdict(int))))))
+
+ # Get list of osd ids from the metadata
+ osd_metadata = self.get('osd_metadata')
+
+ # Grab output from the "osd.x perf histogram dump" command
+ for osd_id in osd_metadata:
+ cmd_dict = {
+ 'prefix': 'perf histogram dump',
+ 'id': str(osd_id),
+ 'format': 'json'
+ }
+ r, outb, outs = self.osd_command(cmd_dict)
+ # Check for invalid calls
+ if r != 0:
+ self.log.error("Invalid command dictionary: {}".format(cmd_dict))
+ continue
+ else:
+ try:
+ # This is where the histograms will land if there are any.
+ dump = json.loads(outb)
+
+ for histogram in dump['osd']:
+ # Log axis information. There are two axes, each represented
+ # as a dictionary. Both dictionaries are contained inside a
+ # list called 'axes'.
+ axes = []
+ for axis in dump['osd'][histogram]['axes']:
+
+ # This is the dict that contains information for an individual
+ # axis. It will be appended to the 'axes' list at the end.
+ axis_dict: Dict[str, Any] = defaultdict()
+
+ # Collecting information for buckets, min, name, etc.
+ axis_dict['buckets'] = axis['buckets']
+ axis_dict['min'] = axis['min']
+ axis_dict['name'] = axis['name']
+ axis_dict['quant_size'] = axis['quant_size']
+ axis_dict['scale_type'] = axis['scale_type']
+
+ # Collecting ranges; placing them in lists to
+ # improve readability later on.
+ ranges = []
+ for _range in axis['ranges']:
+ _max, _min = None, None
+ if 'max' in _range:
+ _max = _range['max']
+ if 'min' in _range:
+ _min = _range['min']
+ ranges.append([_min, _max])
+ axis_dict['ranges'] = ranges
+
+ # Now that 'axis_dict' contains all the appropriate
+ # information for the current axis, append it to the 'axes' list.
+ # There will end up being two axes in the 'axes' list, since the
+ # histograms are 2D.
+ axes.append(axis_dict)
+
+ # Add the 'axes' list, containing both axes, to result.
+ # At this point, you will see that the name of the key is the string
+ # form of our axes list (str(axes)). This is there so that histograms
+ # with different axis configs will not be combined.
+ # These key names are later dropped when only the values are returned.
+ result[str(axes)][histogram]['axes'] = axes
+
+ # Collect current values and make sure they are in
+ # integer form.
+ values = []
+ for value_list in dump['osd'][histogram]['values']:
+ values.append([int(v) for v in value_list])
+
+ if mode == 'separated':
+ if 'osds' not in result[str(axes)][histogram]:
+ result[str(axes)][histogram]['osds'] = []
+ result[str(axes)][histogram]['osds'].append({'osd_id': int(osd_id), 'values': values})
+
+ elif mode == 'aggregated':
+ # Aggregate values. If 'values' have already been initialized,
+ # we can safely add.
+ if 'values' in result[str(axes)][histogram]:
+ for i in range (0, len(values)):
+ for j in range (0, len(values[i])):
+ values[i][j] += result[str(axes)][histogram]['values'][i][j]
+
+ # Add the values to result.
+ result[str(axes)][histogram]['values'] = values
+
+ # Update num_combined_osds
+ if 'num_combined_osds' not in result[str(axes)][histogram]:
+ result[str(axes)][histogram]['num_combined_osds'] = 1
+ else:
+ result[str(axes)][histogram]['num_combined_osds'] += 1
+ else:
+ self.log.error('Incorrect mode specified in get_osd_histograms: {}'.format(mode))
+ return list()
+
+ # Sometimes, json errors occur if you give it an empty string.
+ # I am also putting in a catch for a KeyError since it could
+ # happen where the code is assuming that a key exists in the
+ # schema when it doesn't. In either case, we'll handle that
+ # by continuing and collecting what we can from other osds.
+ except (json.decoder.JSONDecodeError, KeyError) as e:
+ self.log.exception("Error caught on osd.{}: {}".format(osd_id, e))
+ continue
+
+ return list(result.values())
+
+ def get_io_rate(self) -> dict:
+ return self.get('io_rate')
+
+ def get_stats_per_pool(self) -> dict:
+ result = self.get('pg_dump')['pool_stats']
+
+ # collect application metadata from osd_map
+ osd_map = self.get('osd_map')
+ application_metadata = {pool['pool']: pool['application_metadata'] for pool in osd_map['pools']}
+
+ # add application to each pool from pg_dump
+ for pool in result:
+ pool['application'] = []
+ # Only include default applications
+ for application in application_metadata[pool['poolid']]:
+ if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+ pool['application'].append(application)
+
+ return result
+
+ def get_stats_per_pg(self) -> dict:
+ return self.get('pg_dump')['pg_stats']
+
+ def get_rocksdb_stats(self) -> Dict[str, str]:
+ # Initalizers
+ result: Dict[str, str] = defaultdict()
+ version = self.get_rocksdb_version()
+
+ # Update result
+ result['version'] = version
+
+ return result
+
+ def gather_crashinfo(self) -> List[Dict[str, str]]:
+ crashlist: List[Dict[str, str]] = list()
+ errno, crashids, err = self.remote('crash', 'ls')
+ if errno:
+ return crashlist
+ for crashid in crashids.split():
+ errno, crashinfo, err = self.remote('crash', 'do_info', crashid)
+ if errno:
+ continue
+ c = json.loads(crashinfo)
+
+ # redact hostname
+ del c['utsname_hostname']
+
+ # entity_name might have more than one '.', beware
+ (etype, eid) = c.get('entity_name', '').split('.', 1)
+ m = hashlib.sha1()
+ assert self.salt
+ m.update(self.salt.encode('utf-8'))
+ m.update(eid.encode('utf-8'))
+ m.update(self.salt.encode('utf-8'))
+ c['entity_name'] = etype + '.' + m.hexdigest()
+
+ # redact final line of python tracebacks, as the exception
+ # payload may contain identifying information
+ if 'mgr_module' in c and 'backtrace' in c:
+ # backtrace might be empty
+ if len(c['backtrace']) > 0:
+ c['backtrace'][-1] = '<redacted>'
+
+ crashlist.append(c)
+ return crashlist
+
+ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
+ # Extract perf counter data with get_unlabeled_perf_counters(), a method
+ # from mgr/mgr_module.py. This method returns a nested dictionary that
+ # looks a lot like perf schema, except with some additional fields.
+ #
+ # Example of output, a snapshot of a mon daemon:
+ # "mon.b": {
+ # "bluestore.kv_flush_lat": {
+ # "count": 2431,
+ # "description": "Average kv_thread flush latency",
+ # "nick": "fl_l",
+ # "priority": 8,
+ # "type": 5,
+ # "units": 1,
+ # "value": 88814109
+ # },
+ # },
+ perf_counters = self.get_unlabeled_perf_counters()
+
+ # Initialize 'result' dict
+ result: Dict[str, dict] = defaultdict(lambda: defaultdict(
+ lambda: defaultdict(lambda: defaultdict(int))))
+
+ # 'separated' mode
+ anonymized_daemon_dict = {}
+
+ for daemon, perf_counters_by_daemon in perf_counters.items():
+ daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'
+
+ if mode == 'separated':
+ # anonymize individual daemon names except osds
+ if (daemon_type != 'osd'):
+ anonymized_daemon = self.anonymize_entity_name(daemon)
+ anonymized_daemon_dict[anonymized_daemon] = daemon
+ daemon = anonymized_daemon
+
+ # Calculate num combined daemon types if in aggregated mode
+ if mode == 'aggregated':
+ if 'num_combined_daemons' not in result[daemon_type]:
+ result[daemon_type]['num_combined_daemons'] = 1
+ else:
+ result[daemon_type]['num_combined_daemons'] += 1
+
+ for collection in perf_counters_by_daemon:
+ # Split the collection to avoid redundancy in final report; i.e.:
+ # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
+ # bluestore: kv_flush_lat, kv_final_lat
+ col_0, col_1 = collection.split('.')
+
+ # Debug log for empty keys. This initially was a problem for prioritycache
+ # perf counters, where the col_0 was empty for certain mon counters:
+ #
+ # "mon.a": { instead of "mon.a": {
+ # "": { "prioritycache": {
+ # "cache_bytes": {...}, "cache_bytes": {...},
+ #
+ # This log is here to detect any future instances of a similar issue.
+ if (daemon == "") or (col_0 == "") or (col_1 == ""):
+ self.log.debug("Instance of an empty key: {}{}".format(daemon, collection))
+
+ if mode == 'separated':
+ # Add value to result
+ result[daemon][col_0][col_1]['value'] = \
+ perf_counters_by_daemon[collection]['value']
+
+ # Check that 'count' exists, as not all counters have a count field.
+ if 'count' in perf_counters_by_daemon[collection]:
+ result[daemon][col_0][col_1]['count'] = \
+ perf_counters_by_daemon[collection]['count']
+ elif mode == 'aggregated':
+ # Not every rgw daemon has the same schema. Specifically, each rgw daemon
+ # has a uniquely-named collection that starts off identically (i.e.
+ # "objecter-0x...") then diverges (i.e. "...55f4e778e140.op_rmw").
+ # This bit of code combines these unique counters all under one rgw instance.
+ # Without this check, the schema would remain separeted out in the final report.
+ if col_0[0:11] == "objecter-0x":
+ col_0 = "objecter-0x"
+
+ # Check that the value can be incremented. In some cases,
+ # the files are of type 'pair' (real-integer-pair, integer-integer pair).
+ # In those cases, the value is a dictionary, and not a number.
+ # i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
+ if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
+ result[daemon_type][col_0][col_1]['value'] += \
+ perf_counters_by_daemon[collection]['value']
+
+ # Check that 'count' exists, as not all counters have a count field.
+ if 'count' in perf_counters_by_daemon[collection]:
+ result[daemon_type][col_0][col_1]['count'] += \
+ perf_counters_by_daemon[collection]['count']
+ else:
+ self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
+ return {}
+
+ if mode == 'separated':
+ # for debugging purposes only, this data is never reported
+ self.log.debug('Anonymized daemon mapping for telemetry perf_counters (anonymized: real): {}'.format(anonymized_daemon_dict))
+
+ return result
+
+ def get_active_channels(self) -> List[str]:
+ r = []
+ if self.channel_basic:
+ r.append('basic')
+ if self.channel_crash:
+ r.append('crash')
+ if self.channel_device:
+ r.append('device')
+ if self.channel_ident:
+ r.append('ident')
+ if self.channel_perf:
+ r.append('perf')
+ return r
+
+ def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
+ try:
+ time_format = self.remote('devicehealth', 'get_time_format')
+ except Exception as e:
+ self.log.debug('Unable to format time: {}'.format(e))
+ return {}
+ cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
+ min_sample = cutoff.strftime(time_format)
+
+ devices = self.get('devices')['devices']
+ if not devices:
+ self.log.debug('Unable to get device info from the mgr.')
+ return {}
+
+ # anon-host-id -> anon-devid -> { timestamp -> record }
+ res: Dict[str, Dict[str, Dict[str, str]]] = {}
+ for d in devices:
+ devid = d['devid']
+ try:
+ # this is a map of stamp -> {device info}
+ m = self.remote('devicehealth', 'get_recent_device_metrics',
+ devid, min_sample)
+ except Exception as e:
+ self.log.error('Unable to get recent metrics from device with id "{}": {}'.format(devid, e))
+ continue
+
+ # anonymize host id
+ try:
+ host = d['location'][0]['host']
+ except (KeyError, IndexError) as e:
+ self.log.exception('Unable to get host from device with id "{}": {}'.format(devid, e))
+ continue
+ anon_host = self.get_store('host-id/%s' % host)
+ if not anon_host:
+ anon_host = str(uuid.uuid1())
+ self.set_store('host-id/%s' % host, anon_host)
+ serial = None
+ for dev, rep in m.items():
+ rep['host_id'] = anon_host
+ if serial is None and 'serial_number' in rep:
+ serial = rep['serial_number']
+
+ # anonymize device id
+ anon_devid = self.get_store('devid-id/%s' % devid)
+ if not anon_devid:
+ # ideally devid is 'vendor_model_serial',
+ # but can also be 'model_serial', 'serial'
+ if '_' in devid:
+ anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
+ else:
+ anon_devid = str(uuid.uuid1())
+ self.set_store('devid-id/%s' % devid, anon_devid)
+ self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
+ host, anon_host))
+
+ # anonymize the smartctl report itself
+ if serial:
+ m_str = json.dumps(m)
+ m = json.loads(m_str.replace(serial, 'deleted'))
+
+ if anon_host not in res:
+ res[anon_host] = {}
+ res[anon_host][anon_devid] = m
+ return res
+
+ def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
+ data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+ if data:
+ return data[-1][1]
+ else:
+ return 0
+
+ def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
+ if not channels:
+ channels = self.get_active_channels()
+ report = {
+ 'leaderboard': self.leaderboard,
+ 'leaderboard_description': self.leaderboard_description,
+ 'report_version': 1,
+ 'report_timestamp': datetime.utcnow().isoformat(),
+ 'report_id': self.report_id,
+ 'channels': channels,
+ 'channels_available': ALL_CHANNELS,
+ 'license': LICENSE,
+ 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
+ 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
+ }
+
+ if 'ident' in channels:
+ for option in ['description', 'contact', 'organization']:
+ report[option] = getattr(self, option)
+
+ if 'basic' in channels:
+ mon_map = self.get('mon_map')
+ osd_map = self.get('osd_map')
+ service_map = self.get('service_map')
+ fs_map = self.get('fs_map')
+ df = self.get('df')
+ df_pools = {pool['id']: pool for pool in df['pools']}
+
+ report['created'] = mon_map['created']
+
+ # mons
+ v1_mons = 0
+ v2_mons = 0
+ ipv4_mons = 0
+ ipv6_mons = 0
+ for mon in mon_map['mons']:
+ for a in mon['public_addrs']['addrvec']:
+ if a['type'] == 'v2':
+ v2_mons += 1
+ elif a['type'] == 'v1':
+ v1_mons += 1
+ if a['addr'].startswith('['):
+ ipv6_mons += 1
+ else:
+ ipv4_mons += 1
+ report['mon'] = {
+ 'count': len(mon_map['mons']),
+ 'features': mon_map['features'],
+ 'min_mon_release': mon_map['min_mon_release'],
+ 'v1_addr_mons': v1_mons,
+ 'v2_addr_mons': v2_mons,
+ 'ipv4_addr_mons': ipv4_mons,
+ 'ipv6_addr_mons': ipv6_mons,
+ }
+
+ report['config'] = self.gather_configs()
+
+ # pools
+
+ rbd_num_pools = 0
+ rbd_num_images_by_pool = []
+ rbd_mirroring_by_pool = []
+ num_pg = 0
+ report['pools'] = list()
+ for pool in osd_map['pools']:
+ num_pg += pool['pg_num']
+ ec_profile = {}
+ if pool['erasure_code_profile']:
+ orig = osd_map['erasure_code_profiles'].get(
+ pool['erasure_code_profile'], {})
+ ec_profile = {
+ k: orig[k] for k in orig.keys()
+ if k in ['k', 'm', 'plugin', 'technique',
+ 'crush-failure-domain', 'l']
+ }
+ pool_data = {
+ 'pool': pool['pool'],
+ 'pg_num': pool['pg_num'],
+ 'pgp_num': pool['pg_placement_num'],
+ 'size': pool['size'],
+ 'min_size': pool['min_size'],
+ 'pg_autoscale_mode': pool['pg_autoscale_mode'],
+ 'target_max_bytes': pool['target_max_bytes'],
+ 'target_max_objects': pool['target_max_objects'],
+ 'type': ['', 'replicated', '', 'erasure'][pool['type']],
+ 'erasure_code_profile': ec_profile,
+ 'cache_mode': pool['cache_mode'],
+ }
+
+ # basic_pool_usage collection
+ if self.is_enabled_collection(Collection.basic_pool_usage):
+ pool_data['application'] = []
+ for application in pool['application_metadata']:
+ # Only include default applications
+ if application in ['cephfs', 'mgr', 'rbd', 'rgw']:
+ pool_data['application'].append(application)
+ pool_stats = df_pools[pool['pool']]['stats']
+ pool_data['stats'] = { # filter out kb_used
+ 'avail_raw': pool_stats['avail_raw'],
+ 'bytes_used': pool_stats['bytes_used'],
+ 'compress_bytes_used': pool_stats['compress_bytes_used'],
+ 'compress_under_bytes': pool_stats['compress_under_bytes'],
+ 'data_bytes_used': pool_stats['data_bytes_used'],
+ 'dirty': pool_stats['dirty'],
+ 'max_avail': pool_stats['max_avail'],
+ 'objects': pool_stats['objects'],
+ 'omap_bytes_used': pool_stats['omap_bytes_used'],
+ 'percent_used': pool_stats['percent_used'],
+ 'quota_bytes': pool_stats['quota_bytes'],
+ 'quota_objects': pool_stats['quota_objects'],
+ 'rd': pool_stats['rd'],
+ 'rd_bytes': pool_stats['rd_bytes'],
+ 'stored': pool_stats['stored'],
+ 'stored_data': pool_stats['stored_data'],
+ 'stored_omap': pool_stats['stored_omap'],
+ 'stored_raw': pool_stats['stored_raw'],
+ 'wr': pool_stats['wr'],
+ 'wr_bytes': pool_stats['wr_bytes']
+ }
+ pool_data['options'] = {}
+ # basic_pool_options_bluestore collection
+ if self.is_enabled_collection(Collection.basic_pool_options_bluestore):
+ bluestore_options = ['compression_algorithm',
+ 'compression_mode',
+ 'compression_required_ratio',
+ 'compression_min_blob_size',
+ 'compression_max_blob_size']
+ for option in bluestore_options:
+ if option in pool['options']:
+ pool_data['options'][option] = pool['options'][option]
+ cast(List[Dict[str, Any]], report['pools']).append(pool_data)
+ if 'rbd' in pool['application_metadata']:
+ rbd_num_pools += 1
+ ioctx = self.rados.open_ioctx(pool['pool_name'])
+ rbd_num_images_by_pool.append(
+ sum(1 for _ in rbd.RBD().list2(ioctx)))
+ rbd_mirroring_by_pool.append(
+ rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+ report['rbd'] = {
+ 'num_pools': rbd_num_pools,
+ 'num_images_by_pool': rbd_num_images_by_pool,
+ 'mirroring_by_pool': rbd_mirroring_by_pool}
+
+ # osds
+ cluster_network = False
+ for osd in osd_map['osds']:
+ if osd['up'] and not cluster_network:
+ front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
+ back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
+ if front_ip != back_ip:
+ cluster_network = True
+ report['osd'] = {
+ 'count': len(osd_map['osds']),
+ 'require_osd_release': osd_map['require_osd_release'],
+ 'require_min_compat_client': osd_map['require_min_compat_client'],
+ 'cluster_network': cluster_network,
+ }
+
+ # crush
+ report['crush'] = self.gather_crush_info()
+
+ # cephfs
+ report['fs'] = {
+ 'count': len(fs_map['filesystems']),
+ 'feature_flags': fs_map['feature_flags'],
+ 'num_standby_mds': len(fs_map['standbys']),
+ 'filesystems': [],
+ }
+ num_mds = len(fs_map['standbys'])
+ for fsm in fs_map['filesystems']:
+ fs = fsm['mdsmap']
+ num_sessions = 0
+ cached_ino = 0
+ cached_dn = 0
+ cached_cap = 0
+ subtrees = 0
+ rfiles = 0
+ rbytes = 0
+ rsnaps = 0
+ for gid, mds in fs['info'].items():
+ num_sessions += self.get_latest('mds', mds['name'],
+ 'mds_sessions.session_count')
+ cached_ino += self.get_latest('mds', mds['name'],
+ 'mds_mem.ino')
+ cached_dn += self.get_latest('mds', mds['name'],
+ 'mds_mem.dn')
+ cached_cap += self.get_latest('mds', mds['name'],
+ 'mds_mem.cap')
+ subtrees += self.get_latest('mds', mds['name'],
+ 'mds.subtrees')
+ if mds['rank'] == 0:
+ rfiles = self.get_latest('mds', mds['name'],
+ 'mds.root_rfiles')
+ rbytes = self.get_latest('mds', mds['name'],
+ 'mds.root_rbytes')
+ rsnaps = self.get_latest('mds', mds['name'],
+ 'mds.root_rsnaps')
+ report['fs']['filesystems'].append({ # type: ignore
+ 'max_mds': fs['max_mds'],
+ 'ever_allowed_features': fs['ever_allowed_features'],
+ 'explicitly_allowed_features': fs['explicitly_allowed_features'],
+ 'num_in': len(fs['in']),
+ 'num_up': len(fs['up']),
+ 'num_standby_replay': len(
+ [mds for gid, mds in fs['info'].items()
+ if mds['state'] == 'up:standby-replay']),
+ 'num_mds': len(fs['info']),
+ 'num_sessions': num_sessions,
+ 'cached_inos': cached_ino,
+ 'cached_dns': cached_dn,
+ 'cached_caps': cached_cap,
+ 'cached_subtrees': subtrees,
+ 'balancer_enabled': len(fs['balancer']) > 0,
+ 'num_data_pools': len(fs['data_pools']),
+ 'standby_count_wanted': fs['standby_count_wanted'],
+ 'approx_ctime': fs['created'][0:7],
+ 'files': rfiles,
+ 'bytes': rbytes,
+ 'snaps': rsnaps,
+ })
+ num_mds += len(fs['info'])
+ report['fs']['total_num_mds'] = num_mds # type: ignore
+
+ # daemons
+ report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
+ mon=self.gather_mon_metadata(mon_map))
+
+ if self.is_enabled_collection(Collection.basic_mds_metadata):
+ report['metadata']['mds'] = self.gather_mds_metadata() # type: ignore
+
+ # host counts
+ servers = self.list_servers()
+ self.log.debug('servers %s' % servers)
+ hosts = {
+ 'num': len([h for h in servers if h['hostname']]),
+ }
+ for t in ['mon', 'mds', 'osd', 'mgr']:
+ nr_services = sum(1 for host in servers if
+ any(service for service in cast(List[ServiceInfoT],
+ host['services'])
+ if service['type'] == t))
+ hosts['num_with_' + t] = nr_services
+ report['hosts'] = hosts
+
+ report['usage'] = {
+ 'pools': len(df['pools']),
+ 'pg_num': num_pg,
+ 'total_used_bytes': df['stats']['total_used_bytes'],
+ 'total_bytes': df['stats']['total_bytes'],
+ 'total_avail_bytes': df['stats']['total_avail_bytes']
+ }
+ # basic_usage_by_class collection
+ if self.is_enabled_collection(Collection.basic_usage_by_class):
+ report['usage']['stats_by_class'] = {} # type: ignore
+ for device_class in df['stats_by_class']:
+ if device_class in ['hdd', 'ssd', 'nvme']:
+ report['usage']['stats_by_class'][device_class] = df['stats_by_class'][device_class] # type: ignore
+
+ services: DefaultDict[str, int] = defaultdict(int)
+ for key, value in service_map['services'].items():
+ services[key] += 1
+ if key == 'rgw':
+ rgw = {}
+ zones = set()
+ zonegroups = set()
+ frontends = set()
+ count = 0
+ d = value.get('daemons', dict())
+ for k, v in d.items():
+ if k == 'summary' and v:
+ rgw[k] = v
+ elif isinstance(v, dict) and 'metadata' in v:
+ count += 1
+ zones.add(v['metadata']['zone_id'])
+ zonegroups.add(v['metadata']['zonegroup_id'])
+ frontends.add(v['metadata']['frontend_type#0'])
+
+ # we could actually iterate over all the keys of
+ # the dict and check for how many frontends there
+ # are, but it is unlikely that one would be running
+ # more than 2 supported ones
+ f2 = v['metadata'].get('frontend_type#1', None)
+ if f2:
+ frontends.add(f2)
+
+ rgw['count'] = count
+ rgw['zones'] = len(zones)
+ rgw['zonegroups'] = len(zonegroups)
+ rgw['frontends'] = list(frontends) # sets aren't json-serializable
+ report['rgw'] = rgw
+ report['services'] = services
+
+ try:
+ report['balancer'] = self.remote('balancer', 'gather_telemetry')
+ except ImportError:
+ report['balancer'] = {
+ 'active': False
+ }
+
+ # Rook
+ self.get_rook_data(report)
+
+ if 'crash' in channels:
+ report['crashes'] = self.gather_crashinfo()
+
+ if 'perf' in channels:
+ if self.is_enabled_collection(Collection.perf_perf):
+ report['perf_counters'] = self.gather_perf_counters('separated')
+ report['stats_per_pool'] = self.get_stats_per_pool()
+ report['stats_per_pg'] = self.get_stats_per_pg()
+ report['io_rate'] = self.get_io_rate()
+ report['osd_perf_histograms'] = self.get_osd_histograms('separated')
+ report['mempool'] = self.get_mempool('separated')
+ report['heap_stats'] = self.get_heap_stats()
+ report['rocksdb_stats'] = self.get_rocksdb_stats()
+
+ # NOTE: We do not include the 'device' channel in this report; it is
+ # sent to a different endpoint.
+
+ return report
+
+ def get_rook_data(self, report: Dict[str, object]) -> None:
+ r, outb, outs = self.mon_command({
+ 'prefix': 'config-key dump',
+ 'format': 'json'
+ })
+ if r != 0:
+ return
+ try:
+ config_kv_dump = json.loads(outb)
+ except json.decoder.JSONDecodeError:
+ return
+
+ for elem in ROOK_KEYS_BY_COLLECTION:
+ # elem[0] is the full key path (e.g. "rook/node/count/with-csi-nfs-plugin")
+ # elem[1] is the Collection this key belongs to
+ if self.is_enabled_collection(elem[1]):
+ self.add_kv_to_report(report, elem[0], config_kv_dump.get(elem[0]))
+
+ def add_kv_to_report(self, report: Dict[str, object], key_path: str, value: Any) -> None:
+ last_node = key_path.split('/')[-1]
+ for node in key_path.split('/')[0:-1]:
+ if node not in report:
+ report[node] = {}
+ report = report[node] # type: ignore
+
+ # sanity check of keys correctness
+ if not isinstance(report, dict):
+ self.log.error(f"'{key_path}' is an invalid key, expected type 'dict' but got {type(report)}")
+ return
+
+ if last_node in report:
+ self.log.error(f"'{key_path}' is an invalid key, last part must not exist at this point")
+ return
+
+ report[last_node] = value
+
+ def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
+ self.log.info('Sending %s to: %s' % (what, url))
+ proxies = dict()
+ if self.proxy:
+ self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
+ proxies['http'] = self.proxy
+ proxies['https'] = self.proxy
+ try:
+ resp = requests.put(url=url, json=report, proxies=proxies)
+ resp.raise_for_status()
+ except Exception as e:
+ fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
+ self.log.error(fail_reason)
+ return fail_reason
+ return None
+
+ class EndPoint(enum.Enum):
+ ceph = 'ceph'
+ device = 'device'
+
+ def collection_delta(self, channels: Optional[List[str]] = None) -> Optional[List[Collection]]:
+ '''
+ Find collections that are available in the module, but are not in the db
+ '''
+ if self.db_collection is None:
+ return None
+
+ if not channels:
+ channels = ALL_CHANNELS
+ else:
+ for ch in channels:
+ if ch not in ALL_CHANNELS:
+ self.log.debug(f"invalid channel name: {ch}")
+ return None
+
+ new_collection : List[Collection] = []
+
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ if c['channel'] in channels:
+ new_collection.append(c['name'])
+
+ return new_collection
+
+ def is_major_upgrade(self) -> bool:
+ '''
+ Returns True only if the user last opted-in to an older major
+ '''
+ if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
+ # we do not know what Ceph version was when the user last opted-in,
+ # thus we do not wish to nag in case of a major upgrade
+ return False
+
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+
+ if mon_min - self.last_opted_in_ceph_version > 0:
+ self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
+ return True
+
+ return False
+
+ def is_opted_in(self) -> bool:
+ # If len is 0 it means that the user is either opted-out (never
+ # opted-in, or invoked `telemetry off`), or they upgraded from a
+ # telemetry revision 1 or 2, which required to re-opt in to revision 3,
+ # regardless, hence is considered as opted-out
+ if self.db_collection is None:
+ return False
+ return len(self.db_collection) > 0
+
+ def should_nag(self) -> bool:
+ # Find delta between opted-in collections and module collections;
+ # nag only if module has a collection which is not in db, and nag == True.
+
+ # We currently do not nag if the user is opted-out (or never opted-in).
+ # If we wish to do this in the future, we need to have a tri-mode state
+ # (opted in, opted out, no action yet), and it needs to be guarded by a
+ # config optionĀ (so that nagging can be turned off via config).
+ # We also need to add a last_opted_out_ceph_version variable, for the
+ # major upgrade check.
+
+ # check if there are collections the user is not opt-in to
+ # that we should nag about
+ if self.db_collection is not None:
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ if c['nag'] == True:
+ self.log.debug(f"The collection: {c['name']} is not reported")
+ return True
+
+ # user might be opted-in to the most recent collection, or there is no
+ # new collection which requires nagging about; thus nag in case it's a
+ # major upgrade and there are new collections
+ # (which their own nag == False):
+ new_collections = False
+ col_delta = self.collection_delta()
+ if col_delta is not None and len(col_delta) > 0:
+ new_collections = True
+
+ return self.is_major_upgrade() and new_collections
+
+ def init_collection(self) -> None:
+ # We fetch from db the collections the user had already opted-in to.
+ # During the transition the results will be empty, but the user might
+ # be opted-in to an older version (e.g. revision = 3)
+
+ collection = self.get_store('collection')
+
+ if collection is not None:
+ self.db_collection = json.loads(collection)
+
+ if self.db_collection is None:
+ # happens once on upgrade
+ if not self.enabled:
+ # user is not opted-in
+ self.set_store('collection', json.dumps([]))
+ self.log.debug("user is not opted-in")
+ else:
+ # user is opted-in, verify the revision:
+ if self.last_opt_revision == REVISION:
+ self.log.debug(f"telemetry revision is {REVISION}")
+ base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
+ self.set_store('collection', json.dumps(base_collection))
+ else:
+ # user is opted-in to an older version, meaning they need
+ # to re-opt in regardless
+ self.set_store('collection', json.dumps([]))
+ self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
+
+ # reload collection after setting
+ collection = self.get_store('collection')
+ if collection is not None:
+ self.db_collection = json.loads(collection)
+ else:
+ raise RuntimeError('collection is None after initial setting')
+ else:
+ # user has already upgraded
+ self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
+
+ def is_enabled_collection(self, collection: Collection) -> bool:
+ if self.db_collection is None:
+ return False
+ return collection.name in self.db_collection
+
+ def opt_in_all_collections(self) -> None:
+ """
+ Opt-in to all collections; Update db with the currently available collections in the module
+ """
+ if self.db_collection is None:
+ raise RuntimeError('db_collection is None after initial setting')
+
+ for c in MODULE_COLLECTION:
+ if c['name'].name not in self.db_collection:
+ self.db_collection.append(c['name'])
+
+ self.set_store('collection', json.dumps(self.db_collection))
+
+ def send(self,
+ report: Dict[str, Dict[str, str]],
+ endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
+ if not endpoint:
+ endpoint = [self.EndPoint.ceph, self.EndPoint.device]
+ failed = []
+ success = []
+ self.log.debug('Send endpoints %s' % endpoint)
+ for e in endpoint:
+ if e == self.EndPoint.ceph:
+ fail_reason = self._try_post('ceph report', self.url, report)
+ if fail_reason:
+ failed.append(fail_reason)
+ else:
+ now = int(time.time())
+ self.last_upload = now
+ self.set_store('last_upload', str(now))
+ success.append('Ceph report sent to {0}'.format(self.url))
+ self.log.info('Sent report to {0}'.format(self.url))
+ elif e == self.EndPoint.device:
+ if 'device' in self.get_active_channels():
+ devices = self.gather_device_report()
+ if devices:
+ num_devs = 0
+ num_hosts = 0
+ for host, ls in devices.items():
+ self.log.debug('host %s devices %s' % (host, ls))
+ if not len(ls):
+ continue
+ fail_reason = self._try_post('devices', self.device_url,
+ ls)
+ if fail_reason:
+ failed.append(fail_reason)
+ else:
+ num_devs += len(ls)
+ num_hosts += 1
+ if num_devs:
+ success.append('Reported %d devices from %d hosts across a total of %d hosts' % (
+ num_devs, num_hosts, len(devices)))
+ else:
+ fail_reason = 'Unable to send device report: Device channel is on, but the generated report was empty.'
+ failed.append(fail_reason)
+ self.log.error(fail_reason)
+ if failed:
+ return 1, '', '\n'.join(success + failed)
+ return 0, '', '\n'.join(success)
+
+ def format_perf_histogram(self, report: Dict[str, Any]) -> None:
+ # Formatting the perf histograms so they are human-readable. This will change the
+ # ranges and values, which are currently in list form, into strings so that
+ # they are displayed horizontally instead of vertically.
+ if 'report' in report:
+ report = report['report']
+ try:
+ # Formatting ranges and values in osd_perf_histograms
+ mode = 'osd_perf_histograms'
+ for config in report[mode]:
+ for histogram in config:
+ # Adjust ranges by converting lists into strings
+ for axis in config[histogram]['axes']:
+ for i in range(0, len(axis['ranges'])):
+ axis['ranges'][i] = str(axis['ranges'][i])
+
+ for osd in config[histogram]['osds']:
+ for i in range(0, len(osd['values'])):
+ osd['values'][i] = str(osd['values'][i])
+ except KeyError:
+ # If the perf channel is not enabled, there should be a KeyError since
+ # 'osd_perf_histograms' would not be present in the report. In that case,
+ # the show function should pass as usual without trying to format the
+ # histograms.
+ pass
+
+ def toggle_channel(self, action: str, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable or disable a list of channels
+ '''
+ if not self.enabled:
+ # telemetry should be on for channels to be toggled
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ if channels is None:
+ msg = f'Please provide a channel name. Available channels: {ALL_CHANNELS}.'
+ return 0, msg, ''
+
+ state = action == 'enable'
+ msg = ''
+ for c in channels:
+ if c not in ALL_CHANNELS:
+ msg = f"{msg}{c} is not a valid channel name. "\
+ f"Available channels: {ALL_CHANNELS}.\n"
+ else:
+ self.set_module_option(f"channel_{c}", state)
+ setattr(self,
+ f"channel_{c}",
+ state)
+ msg = f"{msg}channel_{c} is {action}d\n"
+
+ return 0, msg, ''
+
+ @CLIReadCommand('telemetry status')
+ def status(self) -> Tuple[int, str, str]:
+ '''
+ Show current configuration
+ '''
+ r = {}
+ for opt in self.MODULE_OPTIONS:
+ r[opt['name']] = getattr(self, opt['name'])
+ r['last_upload'] = (time.ctime(self.last_upload)
+ if self.last_upload else self.last_upload)
+ return 0, json.dumps(r, indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry diff')
+ def diff(self) -> Tuple[int, str, str]:
+ '''
+ Show the diff between opted-in collection and available collection
+ '''
+ diff = []
+ keys = ['nag']
+
+ for c in MODULE_COLLECTION:
+ if not self.is_enabled_collection(c['name']):
+ diff.append({key: val for key, val in c.items() if key not in keys})
+
+ r = None
+ if diff == []:
+ r = "Telemetry is up to date"
+ else:
+ r = json.dumps(diff, indent=4, sort_keys=True)
+
+ return 0, r, ''
+
+ @CLICommand('telemetry on')
+ def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Enable telemetry reports from this cluster
+ '''
+ if license != LICENSE:
+ return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
+ else:
+ self.set_module_option('enabled', True)
+ self.enabled = True
+ self.opt_in_all_collections()
+
+ # for major releases upgrade nagging
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_in_ceph_version', str(mon_min))
+ self.last_opted_in_ceph_version = mon_min
+
+ msg = 'Telemetry is on.'
+ disabled_channels = ''
+ active_channels = self.get_active_channels()
+ for c in ALL_CHANNELS:
+ if c not in active_channels and c != 'ident':
+ disabled_channels = f"{disabled_channels} {c}"
+
+ if len(disabled_channels) > 0:
+ msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
+ f"`ceph telemetry enable channel{disabled_channels}`"
+
+ # wake up serve() to reset health warning
+ self.event.set()
+
+ return 0, msg, ''
+
+ @CLICommand('telemetry off')
+ def off(self) -> Tuple[int, str, str]:
+ '''
+ Disable telemetry reports from this cluster
+ '''
+ if not self.enabled:
+ # telemetry is already off
+ msg = 'Telemetry is currently not enabled, nothing to turn off. '\
+ 'Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ self.set_module_option('enabled', False)
+ self.enabled = False
+ self.set_store('collection', json.dumps([]))
+ self.db_collection = []
+
+ # we might need this info in the future, in case
+ # of nagging when user is opted-out
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_out_ceph_version', str(mon_min))
+ self.last_opted_out_ceph_version = mon_min
+
+ msg = 'Telemetry is now disabled.'
+ return 0, msg, ''
+
+ @CLIReadCommand('telemetry enable channel all')
+ def enable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+ '''
+ Enable all channels
+ '''
+ return self.toggle_channel('enable', channels)
+
+ @CLIReadCommand('telemetry enable channel')
+ def enable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Enable a list of channels
+ '''
+ return self.toggle_channel('enable', channels)
+
+ @CLIReadCommand('telemetry disable channel all')
+ def disable_channel_all(self, channels: List[str] = ALL_CHANNELS) -> Tuple[int, str, str]:
+ '''
+ Disable all channels
+ '''
+ return self.toggle_channel('disable', channels)
+
+ @CLIReadCommand('telemetry disable channel')
+ def disable_channel(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Disable a list of channels
+ '''
+ return self.toggle_channel('disable', channels)
+
+ @CLIReadCommand('telemetry channel ls')
+ def channel_ls(self) -> Tuple[int, str, str]:
+ '''
+ List all channels
+ '''
+ table = PrettyTable(
+ [
+ 'NAME', 'ENABLED', 'DEFAULT', 'DESC',
+ ],
+ border=False)
+ table.align['NAME'] = 'l'
+ table.align['ENABLED'] = 'l'
+ table.align['DEFAULT'] = 'l'
+ table.align['DESC'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 4
+
+ for c in ALL_CHANNELS:
+ enabled = "ON" if getattr(self, f"channel_{c}") else "OFF"
+ for o in self.MODULE_OPTIONS:
+ if o['name'] == f"channel_{c}":
+ default = "ON" if o.get('default', None) else "OFF"
+ desc = o.get('desc', None)
+
+ table.add_row((
+ c,
+ enabled,
+ default,
+ desc,
+ ))
+
+ return 0, table.get_string(sortby="NAME"), ''
+
+ @CLIReadCommand('telemetry collection ls')
+ def collection_ls(self) -> Tuple[int, str, str]:
+ '''
+ List all collections
+ '''
+ col_delta = self.collection_delta()
+ msg = ''
+ if col_delta is not None and len(col_delta) > 0:
+ msg = f"New collections are available:\n" \
+ f"{sorted([c.name for c in col_delta])}\n" \
+ f"Run `ceph telemetry on` to opt-in to these collections.\n"
+
+ table = PrettyTable(
+ [
+ 'NAME', 'STATUS', 'DESC',
+ ],
+ border=False)
+ table.align['NAME'] = 'l'
+ table.align['STATUS'] = 'l'
+ table.align['DESC'] = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 4
+
+ for c in MODULE_COLLECTION:
+ name = c['name']
+ opted_in = self.is_enabled_collection(name)
+ channel_enabled = getattr(self, f"channel_{c['channel']}")
+
+ status = ''
+ if channel_enabled and opted_in:
+ status = "REPORTING"
+ else:
+ why = ''
+ delimiter = ''
+
+ if not opted_in:
+ why += "NOT OPTED-IN"
+ delimiter = ', '
+ if not channel_enabled:
+ why += f"{delimiter}CHANNEL {c['channel']} IS OFF"
+
+ status = f"NOT REPORTING: {why}"
+
+ desc = c['description']
+
+ table.add_row((
+ name,
+ status,
+ desc,
+ ))
+
+ if len(msg):
+ # add a new line between message and table output
+ msg = f"{msg} \n"
+
+ return 0, f'{msg}{table.get_string(sortby="NAME")}', ''
+
+ @CLICommand('telemetry send')
+ def do_send(self,
+ endpoint: Optional[List[EndPoint]] = None,
+ license: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Send a sample report
+ '''
+ if not self.is_opted_in() and license != LICENSE:
+ self.log.debug(('A telemetry send attempt while opted-out. '
+ 'Asking for license agreement'))
+ return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
+To manually send telemetry data, add '--license {LICENSE}' to the 'ceph telemetry send' command.
+Please consider enabling the telemetry module with 'ceph telemetry on'.'''
+ else:
+ self.last_report = self.compile_report()
+ return self.send(self.last_report, endpoint)
+
+ @CLIReadCommand('telemetry show')
+ def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Show a sample report of opted-in collections (except for 'device')
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ report = self.get_report_locked(channels=channels)
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
+
+ if self.channel_device:
+ report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
+
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry preview')
+ def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Preview a sample report of the most recent collections available (except for 'device')
+ '''
+ report = {}
+
+ # We use a lock to prevent a scenario where the user wishes to preview
+ # the report, and at the same time the module hits the interval of
+ # sending a report with the opted-in collection, which has less data
+ # than in the preview report.
+ col_delta = self.collection_delta()
+ with self.get_report_lock:
+ if col_delta is not None and len(col_delta) == 0:
+ # user is already opted-in to the most recent collection
+ msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+ return 0, msg, ''
+ else:
+ # there are collections the user is not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report(channels=channels)
+ self.db_collection = opted_in_collection
+
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
+
+ if self.channel_device:
+ report += '''\nDevice report is generated separately. To see it run 'ceph telemetry preview-device'.'''
+
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry show-device')
+ def show_device(self) -> Tuple[int, str, str]:
+ '''
+ Show a sample device report
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample device reports with `ceph telemetry preview-device`.'
+ return 0, msg, ''
+
+ if not self.channel_device:
+ # if device channel is off, device report is not being sent, hence nothing to show
+ msg = 'device channel is off. Please enable with `ceph telemetry enable channel device`.\n' \
+ 'Preview sample device reports with `ceph telemetry preview-device`.'
+ return 0, msg, ''
+
+ return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry preview-device')
+ def preview_device(self) -> Tuple[int, str, str]:
+ '''
+ Preview a sample device report of the most recent device collection
+ '''
+ report = {}
+
+ device_col_delta = self.collection_delta(['device'])
+ with self.get_report_lock:
+ if device_col_delta is not None and len(device_col_delta) == 0 and self.channel_device:
+ # user is already opted-in to the most recent device collection,
+ # and device channel is on, thus `show-device` should be called
+ msg = 'device channel is on and up to date, see report with `ceph telemetry show-device`.'
+ return 0, msg, ''
+
+ # either the user is not opted-in at all, or there are collections
+ # they are not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report('device')
+ self.db_collection = opted_in_collection
+
+ report = json.dumps(report, indent=4, sort_keys=True)
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry show-all')
+ def show_all(self) -> Tuple[int, str, str]:
+ '''
+ Show a sample report of all enabled channels (including 'device' channel)
+ '''
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ if not self.channel_device:
+ # device channel is off, no need to display its report
+ report = self.get_report_locked('default')
+ else:
+ # telemetry is on and device channel is enabled, show both
+ report = self.get_report_locked('all')
+
+ self.format_perf_histogram(report)
+ return 0, json.dumps(report, indent=4, sort_keys=True), ''
+
+ @CLIReadCommand('telemetry preview-all')
+ def preview_all(self) -> Tuple[int, str, str]:
+ '''
+ Preview a sample report of the most recent collections available of all channels (including 'device')
+ '''
+ report = {}
+
+ col_delta = self.collection_delta()
+ with self.get_report_lock:
+ if col_delta is not None and len(col_delta) == 0:
+ # user is already opted-in to the most recent collection
+ msg = 'Telemetry is up to date, see report with `ceph telemetry show`.'
+ return 0, msg, ''
+
+ # there are collections the user is not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ next_collection.append(c['name'].name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report('all')
+ self.db_collection = opted_in_collection
+
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
+
+ return 0, report, ''
+
+ def get_report_locked(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
+ '''
+ A wrapper around get_report to allow for compiling a report of the most recent module collections
+ '''
+ with self.get_report_lock:
+ return self.get_report(report_type, channels)
+
+ def get_report(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
+ if report_type == 'default':
+ return self.compile_report(channels=channels)
+ elif report_type == 'device':
+ return self.gather_device_report()
+ elif report_type == 'all':
+ return {'report': self.compile_report(channels=channels),
+ 'device_report': self.gather_device_report()}
+ return {}
+
+ def self_test(self) -> None:
+ self.opt_in_all_collections()
+ report = self.compile_report(channels=ALL_CHANNELS)
+ if len(report) == 0:
+ raise RuntimeError('Report is empty')
+
+ if 'report_id' not in report:
+ raise RuntimeError('report_id not found in report')
+
+ def shutdown(self) -> None:
+ self.run = False
+ self.event.set()
+
+ def refresh_health_checks(self) -> None:
+ health_checks = {}
+ # TODO do we want to nag also in case the user is not opted-in?
+ if self.enabled and self.should_nag():
+ health_checks['TELEMETRY_CHANGED'] = {
+ 'severity': 'warning',
+ 'summary': 'Telemetry requires re-opt-in',
+ 'detail': [
+ 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
+ ]
+ }
+ self.set_health_checks(health_checks)
+
+ def serve(self) -> None:
+ self.load()
+ self.run = True
+
+ self.log.debug('Waiting for mgr to warm up')
+ time.sleep(10)
+
+ while self.run:
+ self.event.clear()
+
+ self.refresh_health_checks()
+
+ if not self.is_opted_in():
+ self.log.debug('Not sending report until user re-opts-in')
+ self.event.wait(1800)
+ continue
+ if not self.enabled:
+ self.log.debug('Not sending report until configured to do so')
+ self.event.wait(1800)
+ continue
+
+ now = int(time.time())
+ if not self.last_upload or \
+ (now - self.last_upload) > self.interval * 3600:
+ self.log.info('Compiling and sending report to %s',
+ self.url)
+
+ try:
+ self.last_report = self.compile_report()
+ except Exception:
+ self.log.exception('Exception while compiling report:')
+
+ self.send(self.last_report)
+ else:
+ self.log.debug('Interval for sending new report has not expired')
+
+ sleep = 3600
+ self.log.debug('Sleeping for %d seconds', sleep)
+ self.event.wait(sleep)
+
+ @staticmethod
+ def can_run() -> Tuple[bool, str]:
+ return True, ''