summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/telemetry/module.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/telemetry/module.py
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/telemetry/module.py')
-rw-r--r--src/pybind/mgr/telemetry/module.py877
1 files changed, 877 insertions, 0 deletions
diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py
new file mode 100644
index 000000000..953e5c5f7
--- /dev/null
+++ b/src/pybind/mgr/telemetry/module.py
@@ -0,0 +1,877 @@
+"""
+Telemetry module for ceph-mgr
+
+Collect statistics from Ceph cluster and send this back to the Ceph project
+when user has opted-in
+"""
+import errno
+import hashlib
+import json
+import rbd
+import re
+import requests
+import uuid
+import time
+from datetime import datetime, timedelta
+from threading import Event
+from collections import defaultdict
+
+from mgr_module import MgrModule
+
+
+ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
+
+LICENSE='sharing-1-0'
+LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0'
+LICENSE_URL='https://cdla.io/sharing-1-0/'
+
+# If the telemetry revision has changed since this point, re-require
+# an opt-in. This should happen each time we add new information to
+# the telemetry report.
+LAST_REVISION_RE_OPT_IN = 2
+
+# Latest revision of the telemetry report. Bump this each time we make
+# *any* change.
+REVISION = 3
+
+# History of revisions
+# --------------------
+#
+# Version 1:
+# Mimic and/or nautilus are lumped together here, since
+# we didn't track revisions yet.
+#
+# Version 2:
+# - added revision tracking, nagging, etc.
+# - added config option changes
+# - added channels
+# - added explicit license acknowledgement to the opt-in process
+#
+# Version 3:
+# - added device health metrics (i.e., SMART data, minus serial number)
+# - remove crush_rule
+# - added CephFS metadata (how many MDSs, fs features, how many data pools,
+# how much metadata is cached, rfiles, rbytes, rsnapshots)
+# - added more pool metadata (rep vs ec, cache tiering mode, ec profile)
+# - added host count, and counts for hosts with each of (mon, osd, mds, mgr)
+# - whether an OSD cluster network is in use
+# - rbd pool and image count, and rbd mirror mode (pool-level)
+# - rgw daemons, zones, zonegroups; which rgw frontends
+# - crush map stats
+
+class Module(MgrModule):
+ config = dict()
+
+ metadata_keys = [
+ "arch",
+ "ceph_version",
+ "os",
+ "cpu",
+ "kernel_description",
+ "kernel_version",
+ "distro_description",
+ "distro"
+ ]
+
+ MODULE_OPTIONS = [
+ {
+ 'name': 'url',
+ 'type': 'str',
+ 'default': 'https://telemetry.ceph.com/report'
+ },
+ {
+ 'name': 'device_url',
+ 'type': 'str',
+ 'default': 'https://telemetry.ceph.com/device'
+ },
+ {
+ 'name': 'enabled',
+ 'type': 'bool',
+ 'default': False
+ },
+ {
+ 'name': 'last_opt_revision',
+ 'type': 'int',
+ 'default': 1,
+ },
+ {
+ 'name': 'leaderboard',
+ 'type': 'bool',
+ 'default': False
+ },
+ {
+ 'name': 'description',
+ 'type': 'str',
+ 'default': None
+ },
+ {
+ 'name': 'contact',
+ 'type': 'str',
+ 'default': None
+ },
+ {
+ 'name': 'organization',
+ 'type': 'str',
+ 'default': None
+ },
+ {
+ 'name': 'proxy',
+ 'type': 'str',
+ 'default': None
+ },
+ {
+ 'name': 'interval',
+ 'type': 'int',
+ 'default': 24,
+ 'min': 8
+ },
+ {
+ 'name': 'channel_basic',
+ 'type': 'bool',
+ 'default': True,
+ 'desc': 'Share basic cluster information (size, version)',
+ },
+ {
+ 'name': 'channel_ident',
+ 'type': 'bool',
+ 'default': False,
+ 'description': 'Share a user-provided description and/or contact email for the cluster',
+ },
+ {
+ 'name': 'channel_crash',
+ 'type': 'bool',
+ 'default': True,
+ 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)',
+ },
+ {
+ 'name': 'channel_device',
+ 'type': 'bool',
+ 'default': True,
+ 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)',
+ },
+ ]
+
+ COMMANDS = [
+ {
+ "cmd": "telemetry status",
+ "desc": "Show current configuration",
+ "perm": "r"
+ },
+ {
+ "cmd": "telemetry send "
+ "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false "
+ "name=license,type=CephString,req=false",
+ "desc": "Force sending data to Ceph telemetry",
+ "perm": "rw"
+ },
+ {
+ "cmd": "telemetry show "
+ "name=channels,type=CephString,n=N,req=False",
+ "desc": "Show last report or report to be sent",
+ "perm": "r"
+ },
+ {
+ "cmd": "telemetry show-device",
+ "desc": "Show last device report or device report to be sent",
+ "perm": "r"
+ },
+ {
+ "cmd": "telemetry show-all",
+ "desc": "Show report of all channels",
+ "perm": "r"
+ },
+ {
+ "cmd": "telemetry on name=license,type=CephString,req=false",
+ "desc": "Enable telemetry reports from this cluster",
+ "perm": "rw",
+ },
+ {
+ "cmd": "telemetry off",
+ "desc": "Disable telemetry reports from this cluster",
+ "perm": "rw",
+ },
+ ]
+
+ @property
+ def config_keys(self):
+ return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = False
+ self.last_upload = None
+ self.last_report = dict()
+ self.report_id = None
+ self.salt = None
+ self.config_update_module_option()
+
+ def config_update_module_option(self):
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']))
+ self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
+
+ def config_notify(self):
+ self.config_update_module_option()
+ # wake up serve() thread
+ self.event.set()
+
+ def load(self):
+ self.last_upload = self.get_store('last_upload', None)
+ if self.last_upload is not None:
+ self.last_upload = int(self.last_upload)
+
+ self.report_id = self.get_store('report_id', None)
+ if self.report_id is None:
+ self.report_id = str(uuid.uuid4())
+ self.set_store('report_id', self.report_id)
+
+ self.salt = self.get_store('salt', None)
+ if not self.salt:
+ self.salt = str(uuid.uuid4())
+ self.set_store('salt', self.salt)
+
+ def gather_osd_metadata(self, osd_map):
+ keys = ["osd_objectstore", "rotational"]
+ keys += self.metadata_keys
+
+ metadata = dict()
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for osd in osd_map['osds']:
+ res = self.get_metadata('osd', str(osd['osd'])).items()
+ if res is None:
+ self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
+ continue
+ for k, v in res:
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_mon_metadata(self, mon_map):
+ keys = list()
+ keys += self.metadata_keys
+
+ metadata = dict()
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for mon in mon_map['mons']:
+ res = self.get_metadata('mon', mon['name']).items()
+ if res is None:
+ self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
+ continue
+ for k, v in res:
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
+ def gather_crush_info(self):
+ osdmap = self.get_osdmap()
+ crush_raw = osdmap.get_crush()
+ crush = crush_raw.dump()
+
+ def inc(d, k):
+ if k in d:
+ d[k] += 1
+ else:
+ d[k] = 1
+
+ device_classes = {}
+ for dev in crush['devices']:
+ inc(device_classes, dev.get('class', ''))
+
+ bucket_algs = {}
+ bucket_types = {}
+ bucket_sizes = {}
+ for bucket in crush['buckets']:
+ if '~' in bucket['name']: # ignore shadow buckets
+ continue
+ inc(bucket_algs, bucket['alg'])
+ inc(bucket_types, bucket['type_id'])
+ inc(bucket_sizes, len(bucket['items']))
+
+ return {
+ 'num_devices': len(crush['devices']),
+ 'num_types': len(crush['types']),
+ 'num_buckets': len(crush['buckets']),
+ 'num_rules': len(crush['rules']),
+ 'device_classes': list(device_classes.values()),
+ 'tunables': crush['tunables'],
+ 'compat_weight_set': '-1' in crush['choose_args'],
+ 'num_weight_sets': len(crush['choose_args']),
+ 'bucket_algs': bucket_algs,
+ 'bucket_sizes': bucket_sizes,
+ 'bucket_types': bucket_types,
+ }
+
+ def gather_configs(self):
+ # cluster config options
+ cluster = set()
+ r, outb, outs = self.mon_command({
+ 'prefix': 'config dump',
+ 'format': 'json'
+ });
+ if r != 0:
+ return {}
+ try:
+ dump = json.loads(outb)
+ except json.decoder.JSONDecodeError:
+ return {}
+ for opt in dump:
+ name = opt.get('name')
+ if name:
+ cluster.add(name)
+ # daemon-reported options (which may include ceph.conf)
+ active = set()
+ ls = self.get("modified_config_options");
+ for opt in ls.get('options', {}):
+ active.add(opt)
+ return {
+ 'cluster_changed': sorted(list(cluster)),
+ 'active_changed': sorted(list(active)),
+ }
+
+ def gather_crashinfo(self):
+ crashlist = list()
+ errno, crashids, err = self.remote('crash', 'ls')
+ if errno:
+ return ''
+ for crashid in crashids.split():
+ cmd = {'id': crashid}
+ errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
+ if errno:
+ continue
+ c = json.loads(crashinfo)
+ del c['utsname_hostname']
+ # entity_name might have more than one '.', beware
+ (etype, eid) = c.get('entity_name', '').split('.', 1)
+ m = hashlib.sha1()
+ m.update(self.salt.encode('utf-8'))
+ m.update(eid.encode('utf-8'))
+ m.update(self.salt.encode('utf-8'))
+ c['entity_name'] = etype + '.' + m.hexdigest()
+ crashlist.append(c)
+ return crashlist
+
+ def get_active_channels(self):
+ r = []
+ if self.channel_basic:
+ r.append('basic')
+ if self.channel_crash:
+ r.append('crash')
+ if self.channel_device:
+ r.append('device')
+ if self.channel_ident:
+ r.append('ident')
+ return r
+
+ def gather_device_report(self):
+ try:
+ time_format = self.remote('devicehealth', 'get_time_format')
+ except:
+ return None
+ cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
+ min_sample = cutoff.strftime(time_format)
+
+ devices = self.get('devices')['devices']
+
+ res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
+ for d in devices:
+ devid = d['devid']
+ try:
+ # this is a map of stamp -> {device info}
+ m = self.remote('devicehealth', 'get_recent_device_metrics',
+ devid, min_sample)
+ except:
+ continue
+
+ # anonymize host id
+ try:
+ host = d['location'][0]['host']
+ except:
+ continue
+ anon_host = self.get_store('host-id/%s' % host)
+ if not anon_host:
+ anon_host = str(uuid.uuid1())
+ self.set_store('host-id/%s' % host, anon_host)
+ serial = None
+ for dev, rep in m.items():
+ rep['host_id'] = anon_host
+ if serial is None and 'serial_number' in rep:
+ serial = rep['serial_number']
+
+ # anonymize device id
+ anon_devid = self.get_store('devid-id/%s' % devid)
+ if not anon_devid:
+ # ideally devid is 'vendor_model_serial',
+ # but can also be 'model_serial', 'serial'
+ if '_' in devid:
+ anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}"
+ else:
+ anon_devid = str(uuid.uuid1())
+ self.set_store('devid-id/%s' % devid, anon_devid)
+ self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid,
+ host, anon_host))
+
+ # anonymize the smartctl report itself
+ if serial:
+ m_str = json.dumps(m)
+ m = json.loads(m_str.replace(serial, 'deleted'))
+
+ if anon_host not in res:
+ res[anon_host] = {}
+ res[anon_host][anon_devid] = m
+ return res
+
+ def get_latest(self, daemon_type, daemon_name, stat):
+ data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+ #self.log.error("get_latest {0} data={1}".format(stat, data))
+ if data:
+ return data[-1][1]
+ else:
+ return 0
+
+ def compile_report(self, channels=[]):
+ if not channels:
+ channels = self.get_active_channels()
+ report = {
+ 'leaderboard': self.leaderboard,
+ 'report_version': 1,
+ 'report_timestamp': datetime.utcnow().isoformat(),
+ 'report_id': self.report_id,
+ 'channels': channels,
+ 'channels_available': ALL_CHANNELS,
+ 'license': LICENSE,
+ }
+
+ if 'ident' in channels:
+ for option in ['description', 'contact', 'organization']:
+ report[option] = getattr(self, option)
+
+ if 'basic' in channels:
+ mon_map = self.get('mon_map')
+ osd_map = self.get('osd_map')
+ service_map = self.get('service_map')
+ fs_map = self.get('fs_map')
+ df = self.get('df')
+
+ report['created'] = mon_map['created']
+
+ # mons
+ v1_mons = 0
+ v2_mons = 0
+ ipv4_mons = 0
+ ipv6_mons = 0
+ for mon in mon_map['mons']:
+ for a in mon['public_addrs']['addrvec']:
+ if a['type'] == 'v2':
+ v2_mons += 1
+ elif a['type'] == 'v1':
+ v1_mons += 1
+ if a['addr'].startswith('['):
+ ipv6_mons += 1
+ else:
+ ipv4_mons += 1
+ report['mon'] = {
+ 'count': len(mon_map['mons']),
+ 'features': mon_map['features'],
+ 'min_mon_release': mon_map['min_mon_release'],
+ 'v1_addr_mons': v1_mons,
+ 'v2_addr_mons': v2_mons,
+ 'ipv4_addr_mons': ipv4_mons,
+ 'ipv6_addr_mons': ipv6_mons,
+ }
+
+ report['config'] = self.gather_configs()
+
+ # pools
+ report['rbd'] = {
+ 'num_pools': 0,
+ 'num_images_by_pool': [],
+ 'mirroring_by_pool': [],
+ }
+ num_pg = 0
+ report['pools'] = list()
+ for pool in osd_map['pools']:
+ num_pg += pool['pg_num']
+ ec_profile = {}
+ if pool['erasure_code_profile']:
+ orig = osd_map['erasure_code_profiles'].get(
+ pool['erasure_code_profile'], {})
+ ec_profile = {
+ k: orig[k] for k in orig.keys()
+ if k in ['k', 'm', 'plugin', 'technique',
+ 'crush-failure-domain', 'l']
+ }
+ report['pools'].append(
+ {
+ 'pool': pool['pool'],
+ 'type': pool['type'],
+ 'pg_num': pool['pg_num'],
+ 'pgp_num': pool['pg_placement_num'],
+ 'size': pool['size'],
+ 'min_size': pool['min_size'],
+ 'pg_autoscale_mode': pool['pg_autoscale_mode'],
+ 'target_max_bytes': pool['target_max_bytes'],
+ 'target_max_objects': pool['target_max_objects'],
+ 'type': ['', 'replicated', '', 'erasure'][pool['type']],
+ 'erasure_code_profile': ec_profile,
+ 'cache_mode': pool['cache_mode'],
+ }
+ )
+ if 'rbd' in pool['application_metadata']:
+ report['rbd']['num_pools'] += 1
+ ioctx = self.rados.open_ioctx(pool['pool_name'])
+ report['rbd']['num_images_by_pool'].append(
+ sum(1 for _ in rbd.RBD().list2(ioctx)))
+ report['rbd']['mirroring_by_pool'].append(
+ rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+
+ # osds
+ cluster_network = False
+ for osd in osd_map['osds']:
+ if osd['up'] and not cluster_network:
+ front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0]
+ back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0]
+ if front_ip != back_ip:
+ cluster_network = True
+ report['osd'] = {
+ 'count': len(osd_map['osds']),
+ 'require_osd_release': osd_map['require_osd_release'],
+ 'require_min_compat_client': osd_map['require_min_compat_client'],
+ 'cluster_network': cluster_network,
+ }
+
+ # crush
+ report['crush'] = self.gather_crush_info()
+
+ # cephfs
+ report['fs'] = {
+ 'count': len(fs_map['filesystems']),
+ 'feature_flags': fs_map['feature_flags'],
+ 'num_standby_mds': len(fs_map['standbys']),
+ 'filesystems': [],
+ }
+ num_mds = len(fs_map['standbys'])
+ for fsm in fs_map['filesystems']:
+ fs = fsm['mdsmap']
+ num_sessions = 0
+ cached_ino = 0
+ cached_dn = 0
+ cached_cap = 0
+ subtrees = 0
+ rfiles = 0
+ rbytes = 0
+ rsnaps = 0
+ for gid, mds in fs['info'].items():
+ num_sessions += self.get_latest('mds', mds['name'],
+ 'mds_sessions.session_count')
+ cached_ino += self.get_latest('mds', mds['name'],
+ 'mds_mem.ino')
+ cached_dn += self.get_latest('mds', mds['name'],
+ 'mds_mem.dn')
+ cached_cap += self.get_latest('mds', mds['name'],
+ 'mds_mem.cap')
+ subtrees += self.get_latest('mds', mds['name'],
+ 'mds.subtrees')
+ if mds['rank'] == 0:
+ rfiles = self.get_latest('mds', mds['name'],
+ 'mds.root_rfiles')
+ rbytes = self.get_latest('mds', mds['name'],
+ 'mds.root_rbytes')
+ rsnaps = self.get_latest('mds', mds['name'],
+ 'mds.root_rsnaps')
+ report['fs']['filesystems'].append({
+ 'max_mds': fs['max_mds'],
+ 'ever_allowed_features': fs['ever_allowed_features'],
+ 'explicitly_allowed_features': fs['explicitly_allowed_features'],
+ 'num_in': len(fs['in']),
+ 'num_up': len(fs['up']),
+ 'num_standby_replay': len(
+ [mds for gid, mds in fs['info'].items()
+ if mds['state'] == 'up:standby-replay']),
+ 'num_mds': len(fs['info']),
+ 'num_sessions': num_sessions,
+ 'cached_inos': cached_ino,
+ 'cached_dns': cached_dn,
+ 'cached_caps': cached_cap,
+ 'cached_subtrees': subtrees,
+ 'balancer_enabled': len(fs['balancer']) > 0,
+ 'num_data_pools': len(fs['data_pools']),
+ 'standby_count_wanted': fs['standby_count_wanted'],
+ 'approx_ctime': fs['created'][0:7],
+ 'files': rfiles,
+ 'bytes': rbytes,
+ 'snaps': rsnaps,
+ })
+ num_mds += len(fs['info'])
+ report['fs']['total_num_mds'] = num_mds
+
+ # daemons
+ report['metadata'] = dict()
+ report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
+ report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+
+ # host counts
+ servers = self.list_servers()
+ self.log.debug('servers %s' % servers)
+ report['hosts'] = {
+ 'num': len([h for h in servers if h['hostname']]),
+ }
+ for t in ['mon', 'mds', 'osd', 'mgr']:
+ report['hosts']['num_with_' + t] = len(
+ [h for h in servers
+ if len([s for s in h['services'] if s['type'] == t])]
+ )
+
+ report['usage'] = {
+ 'pools': len(df['pools']),
+ 'pg_num': num_pg,
+ 'total_used_bytes': df['stats']['total_used_bytes'],
+ 'total_bytes': df['stats']['total_bytes'],
+ 'total_avail_bytes': df['stats']['total_avail_bytes']
+ }
+
+ report['services'] = defaultdict(int)
+ for key, value in service_map['services'].items():
+ report['services'][key] += 1
+ if key == 'rgw':
+ report['rgw'] = {
+ 'count': 0,
+ }
+ zones = set()
+ realms = set()
+ zonegroups = set()
+ frontends = set()
+ d = value.get('daemons', dict())
+
+ for k,v in d.items():
+ if k == 'summary' and v:
+ report['rgw'][k] = v
+ elif isinstance(v, dict) and 'metadata' in v:
+ report['rgw']['count'] += 1
+ zones.add(v['metadata']['zone_id'])
+ zonegroups.add(v['metadata']['zonegroup_id'])
+ frontends.add(v['metadata']['frontend_type#0'])
+
+ # we could actually iterate over all the keys of
+ # the dict and check for how many frontends there
+ # are, but it is unlikely that one would be running
+ # more than 2 supported ones
+ f2 = v['metadata'].get('frontend_type#1', None)
+ if f2:
+ frontends.add(f2)
+
+ report['rgw']['zones'] = len(zones)
+ report['rgw']['zonegroups'] = len(zonegroups)
+ report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
+
+ try:
+ report['balancer'] = self.remote('balancer', 'gather_telemetry')
+ except ImportError:
+ report['balancer'] = {
+ 'active': False
+ }
+
+ if 'crash' in channels:
+ report['crashes'] = self.gather_crashinfo()
+
+ # NOTE: We do not include the 'device' channel in this report; it is
+ # sent to a different endpoint.
+
+ return report
+
+ def _try_post(self, what, url, report):
+ self.log.info('Sending %s to: %s' % (what, url))
+ proxies = dict()
+ if self.proxy:
+ self.log.info('Send using HTTP(S) proxy: %s', self.proxy)
+ proxies['http'] = self.proxy
+ proxies['https'] = self.proxy
+ try:
+ resp = requests.put(url=url, json=report, proxies=proxies)
+ resp.raise_for_status()
+ except Exception as e:
+ fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e))
+ self.log.error(fail_reason)
+ return fail_reason
+ return None
+
+ def send(self, report, endpoint=None):
+ if not endpoint:
+ endpoint = ['ceph', 'device']
+ failed = []
+ success = []
+ self.log.debug('Send endpoints %s' % endpoint)
+ for e in endpoint:
+ if e == 'ceph':
+ fail_reason = self._try_post('ceph report', self.url, report)
+ if fail_reason:
+ failed.append(fail_reason)
+ else:
+ now = int(time.time())
+ self.last_upload = now
+ self.set_store('last_upload', str(now))
+ success.append('Ceph report sent to {0}'.format(self.url))
+ self.log.info('Sent report to {0}'.format(self.url))
+ elif e == 'device':
+ if 'device' in self.get_active_channels():
+ devices = self.gather_device_report()
+ num_devs = 0
+ num_hosts = 0
+ for host, ls in devices.items():
+ self.log.debug('host %s devices %s' % (host, ls))
+ if not len(ls):
+ continue
+ fail_reason = self._try_post('devices', self.device_url,
+ ls)
+ if fail_reason:
+ failed.append(fail_reason)
+ else:
+ num_devs += len(ls)
+ num_hosts += 1
+ if num_devs:
+ success.append('Reported %d devices across %d hosts' % (
+ num_devs, len(devices)))
+ if failed:
+ return 1, '', '\n'.join(success + failed)
+ return 0, '', '\n'.join(success)
+
+ def handle_command(self, inbuf, command):
+ if command['prefix'] == 'telemetry status':
+ r = {}
+ for opt in self.MODULE_OPTIONS:
+ r[opt['name']] = getattr(self, opt['name'])
+ r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload
+ return 0, json.dumps(r, indent=4, sort_keys=True), ''
+ elif command['prefix'] == 'telemetry on':
+ if command.get('license') != LICENSE:
+ return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command."
+ self.on()
+ return 0, '', ''
+ elif command['prefix'] == 'telemetry off':
+ self.off()
+ return 0, '', ''
+ elif command['prefix'] == 'telemetry send':
+ if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE:
+ self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
+ return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
+ self.last_report = self.compile_report()
+ return self.send(self.last_report, command.get('endpoint'))
+
+ elif command['prefix'] == 'telemetry show':
+ report = self.get_report(channels=command.get('channels', None))
+ report = json.dumps(report, indent=4, sort_keys=True)
+ if self.channel_device:
+ report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.'
+ return 0, report, ''
+ elif command['prefix'] == 'telemetry show-device':
+ return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
+ elif command['prefix'] == 'telemetry show-all':
+ return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
+ else:
+ return (-errno.EINVAL, '',
+ "Command not found '{0}'".format(command['prefix']))
+
+ def on(self):
+ self.set_module_option('enabled', True)
+ self.set_module_option('last_opt_revision', REVISION)
+
+ # wake up serve() to reset health warning
+ self.event.set()
+
+ def off(self):
+ self.set_module_option('enabled', False)
+ self.set_module_option('last_opt_revision', 1)
+
+ def get_report(self, report_type='default', channels=None):
+ if report_type == 'default':
+ return self.compile_report(channels=channels)
+ elif report_type == 'device':
+ return self.gather_device_report()
+ elif report_type == 'all':
+ return {'report': self.compile_report(channels=channels),
+ 'device_report': self.gather_device_report()}
+ return {}
+
+ def self_test(self):
+ report = self.compile_report()
+ if len(report) == 0:
+ raise RuntimeError('Report is empty')
+
+ if 'report_id' not in report:
+ raise RuntimeError('report_id not found in report')
+
+ def shutdown(self):
+ self.run = False
+ self.event.set()
+
+ def refresh_health_checks(self):
+ health_checks = {}
+ if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ health_checks['TELEMETRY_CHANGED'] = {
+ 'severity': 'warning',
+ 'summary': 'Telemetry requires re-opt-in',
+ 'detail': [
+ 'telemetry report includes new information; must re-opt-in (or out)'
+ ]
+ }
+ self.set_health_checks(health_checks)
+
+ def serve(self):
+ self.load()
+ self.run = True
+
+ self.log.debug('Waiting for mgr to warm up')
+ time.sleep(10)
+
+ while self.run:
+ self.event.clear()
+
+ self.refresh_health_checks()
+
+ if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ self.log.debug('Not sending report until user re-opts-in')
+ self.event.wait(1800)
+ continue
+ if not self.enabled:
+ self.log.debug('Not sending report until configured to do so')
+ self.event.wait(1800)
+ continue
+
+ now = int(time.time())
+ if not self.last_upload or (now - self.last_upload) > \
+ self.interval * 3600:
+ self.log.info('Compiling and sending report to %s',
+ self.url)
+
+ try:
+ self.last_report = self.compile_report()
+ except:
+ self.log.exception('Exception while compiling report:')
+
+ self.send(self.last_report)
+ else:
+ self.log.debug('Interval for sending new report has not expired')
+
+ sleep = 3600
+ self.log.debug('Sleeping for %d seconds', sleep)
+ self.event.wait(sleep)
+
+ def self_test(self):
+ self.compile_report()
+ return True
+
+ @staticmethod
+ def can_run():
+ return True, ''