diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/telemetry/module.py | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/telemetry/module.py')
-rw-r--r-- | src/pybind/mgr/telemetry/module.py | 877 |
1 files changed, 877 insertions, 0 deletions
diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py new file mode 100644 index 000000000..953e5c5f7 --- /dev/null +++ b/src/pybind/mgr/telemetry/module.py @@ -0,0 +1,877 @@ +""" +Telemetry module for ceph-mgr + +Collect statistics from Ceph cluster and send this back to the Ceph project +when user has opted-in +""" +import errno +import hashlib +import json +import rbd +import re +import requests +import uuid +import time +from datetime import datetime, timedelta +from threading import Event +from collections import defaultdict + +from mgr_module import MgrModule + + +ALL_CHANNELS = ['basic', 'ident', 'crash', 'device'] + +LICENSE='sharing-1-0' +LICENSE_NAME='Community Data License Agreement - Sharing - Version 1.0' +LICENSE_URL='https://cdla.io/sharing-1-0/' + +# If the telemetry revision has changed since this point, re-require +# an opt-in. This should happen each time we add new information to +# the telemetry report. +LAST_REVISION_RE_OPT_IN = 2 + +# Latest revision of the telemetry report. Bump this each time we make +# *any* change. +REVISION = 3 + +# History of revisions +# -------------------- +# +# Version 1: +# Mimic and/or nautilus are lumped together here, since +# we didn't track revisions yet. +# +# Version 2: +# - added revision tracking, nagging, etc. +# - added config option changes +# - added channels +# - added explicit license acknowledgement to the opt-in process +# +# Version 3: +# - added device health metrics (i.e., SMART data, minus serial number) +# - remove crush_rule +# - added CephFS metadata (how many MDSs, fs features, how many data pools, +# how much metadata is cached, rfiles, rbytes, rsnapshots) +# - added more pool metadata (rep vs ec, cache tiering mode, ec profile) +# - added host count, and counts for hosts with each of (mon, osd, mds, mgr) +# - whether an OSD cluster network is in use +# - rbd pool and image count, and rbd mirror mode (pool-level) +# - rgw daemons, zones, zonegroups; which rgw frontends +# - crush map stats + +class Module(MgrModule): + config = dict() + + metadata_keys = [ + "arch", + "ceph_version", + "os", + "cpu", + "kernel_description", + "kernel_version", + "distro_description", + "distro" + ] + + MODULE_OPTIONS = [ + { + 'name': 'url', + 'type': 'str', + 'default': 'https://telemetry.ceph.com/report' + }, + { + 'name': 'device_url', + 'type': 'str', + 'default': 'https://telemetry.ceph.com/device' + }, + { + 'name': 'enabled', + 'type': 'bool', + 'default': False + }, + { + 'name': 'last_opt_revision', + 'type': 'int', + 'default': 1, + }, + { + 'name': 'leaderboard', + 'type': 'bool', + 'default': False + }, + { + 'name': 'description', + 'type': 'str', + 'default': None + }, + { + 'name': 'contact', + 'type': 'str', + 'default': None + }, + { + 'name': 'organization', + 'type': 'str', + 'default': None + }, + { + 'name': 'proxy', + 'type': 'str', + 'default': None + }, + { + 'name': 'interval', + 'type': 'int', + 'default': 24, + 'min': 8 + }, + { + 'name': 'channel_basic', + 'type': 'bool', + 'default': True, + 'desc': 'Share basic cluster information (size, version)', + }, + { + 'name': 'channel_ident', + 'type': 'bool', + 'default': False, + 'description': 'Share a user-provided description and/or contact email for the cluster', + }, + { + 'name': 'channel_crash', + 'type': 'bool', + 'default': True, + 'description': 'Share metadata about Ceph daemon crashes (version, stack straces, etc)', + }, + { + 'name': 'channel_device', + 'type': 'bool', + 'default': True, + 'description': 'Share device health metrics (e.g., SMART data, minus potentially identifying info like serial numbers)', + }, + ] + + COMMANDS = [ + { + "cmd": "telemetry status", + "desc": "Show current configuration", + "perm": "r" + }, + { + "cmd": "telemetry send " + "name=endpoint,type=CephChoices,strings=ceph|device,n=N,req=false " + "name=license,type=CephString,req=false", + "desc": "Force sending data to Ceph telemetry", + "perm": "rw" + }, + { + "cmd": "telemetry show " + "name=channels,type=CephString,n=N,req=False", + "desc": "Show last report or report to be sent", + "perm": "r" + }, + { + "cmd": "telemetry show-device", + "desc": "Show last device report or device report to be sent", + "perm": "r" + }, + { + "cmd": "telemetry show-all", + "desc": "Show report of all channels", + "perm": "r" + }, + { + "cmd": "telemetry on name=license,type=CephString,req=false", + "desc": "Enable telemetry reports from this cluster", + "perm": "rw", + }, + { + "cmd": "telemetry off", + "desc": "Disable telemetry reports from this cluster", + "perm": "rw", + }, + ] + + @property + def config_keys(self): + return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + self.run = False + self.last_upload = None + self.last_report = dict() + self.report_id = None + self.salt = None + self.config_update_module_option() + + def config_update_module_option(self): + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name'])) + + def config_notify(self): + self.config_update_module_option() + # wake up serve() thread + self.event.set() + + def load(self): + self.last_upload = self.get_store('last_upload', None) + if self.last_upload is not None: + self.last_upload = int(self.last_upload) + + self.report_id = self.get_store('report_id', None) + if self.report_id is None: + self.report_id = str(uuid.uuid4()) + self.set_store('report_id', self.report_id) + + self.salt = self.get_store('salt', None) + if not self.salt: + self.salt = str(uuid.uuid4()) + self.set_store('salt', self.salt) + + def gather_osd_metadata(self, osd_map): + keys = ["osd_objectstore", "rotational"] + keys += self.metadata_keys + + metadata = dict() + for key in keys: + metadata[key] = defaultdict(int) + + for osd in osd_map['osds']: + res = self.get_metadata('osd', str(osd['osd'])).items() + if res is None: + self.log.debug('Could not get metadata for osd.%s' % str(osd['osd'])) + continue + for k, v in res: + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + + def gather_mon_metadata(self, mon_map): + keys = list() + keys += self.metadata_keys + + metadata = dict() + for key in keys: + metadata[key] = defaultdict(int) + + for mon in mon_map['mons']: + res = self.get_metadata('mon', mon['name']).items() + if res is None: + self.log.debug('Could not get metadata for mon.%s' % (mon['name'])) + continue + for k, v in res: + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + + def gather_crush_info(self): + osdmap = self.get_osdmap() + crush_raw = osdmap.get_crush() + crush = crush_raw.dump() + + def inc(d, k): + if k in d: + d[k] += 1 + else: + d[k] = 1 + + device_classes = {} + for dev in crush['devices']: + inc(device_classes, dev.get('class', '')) + + bucket_algs = {} + bucket_types = {} + bucket_sizes = {} + for bucket in crush['buckets']: + if '~' in bucket['name']: # ignore shadow buckets + continue + inc(bucket_algs, bucket['alg']) + inc(bucket_types, bucket['type_id']) + inc(bucket_sizes, len(bucket['items'])) + + return { + 'num_devices': len(crush['devices']), + 'num_types': len(crush['types']), + 'num_buckets': len(crush['buckets']), + 'num_rules': len(crush['rules']), + 'device_classes': list(device_classes.values()), + 'tunables': crush['tunables'], + 'compat_weight_set': '-1' in crush['choose_args'], + 'num_weight_sets': len(crush['choose_args']), + 'bucket_algs': bucket_algs, + 'bucket_sizes': bucket_sizes, + 'bucket_types': bucket_types, + } + + def gather_configs(self): + # cluster config options + cluster = set() + r, outb, outs = self.mon_command({ + 'prefix': 'config dump', + 'format': 'json' + }); + if r != 0: + return {} + try: + dump = json.loads(outb) + except json.decoder.JSONDecodeError: + return {} + for opt in dump: + name = opt.get('name') + if name: + cluster.add(name) + # daemon-reported options (which may include ceph.conf) + active = set() + ls = self.get("modified_config_options"); + for opt in ls.get('options', {}): + active.add(opt) + return { + 'cluster_changed': sorted(list(cluster)), + 'active_changed': sorted(list(active)), + } + + def gather_crashinfo(self): + crashlist = list() + errno, crashids, err = self.remote('crash', 'ls') + if errno: + return '' + for crashid in crashids.split(): + cmd = {'id': crashid} + errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '') + if errno: + continue + c = json.loads(crashinfo) + del c['utsname_hostname'] + # entity_name might have more than one '.', beware + (etype, eid) = c.get('entity_name', '').split('.', 1) + m = hashlib.sha1() + m.update(self.salt.encode('utf-8')) + m.update(eid.encode('utf-8')) + m.update(self.salt.encode('utf-8')) + c['entity_name'] = etype + '.' + m.hexdigest() + crashlist.append(c) + return crashlist + + def get_active_channels(self): + r = [] + if self.channel_basic: + r.append('basic') + if self.channel_crash: + r.append('crash') + if self.channel_device: + r.append('device') + if self.channel_ident: + r.append('ident') + return r + + def gather_device_report(self): + try: + time_format = self.remote('devicehealth', 'get_time_format') + except: + return None + cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2) + min_sample = cutoff.strftime(time_format) + + devices = self.get('devices')['devices'] + + res = {} # anon-host-id -> anon-devid -> { timestamp -> record } + for d in devices: + devid = d['devid'] + try: + # this is a map of stamp -> {device info} + m = self.remote('devicehealth', 'get_recent_device_metrics', + devid, min_sample) + except: + continue + + # anonymize host id + try: + host = d['location'][0]['host'] + except: + continue + anon_host = self.get_store('host-id/%s' % host) + if not anon_host: + anon_host = str(uuid.uuid1()) + self.set_store('host-id/%s' % host, anon_host) + serial = None + for dev, rep in m.items(): + rep['host_id'] = anon_host + if serial is None and 'serial_number' in rep: + serial = rep['serial_number'] + + # anonymize device id + anon_devid = self.get_store('devid-id/%s' % devid) + if not anon_devid: + # ideally devid is 'vendor_model_serial', + # but can also be 'model_serial', 'serial' + if '_' in devid: + anon_devid = f"{devid.rsplit('_', 1)[0]}_{uuid.uuid1()}" + else: + anon_devid = str(uuid.uuid1()) + self.set_store('devid-id/%s' % devid, anon_devid) + self.log.info('devid %s / %s, host %s / %s' % (devid, anon_devid, + host, anon_host)) + + # anonymize the smartctl report itself + if serial: + m_str = json.dumps(m) + m = json.loads(m_str.replace(serial, 'deleted')) + + if anon_host not in res: + res[anon_host] = {} + res[anon_host][anon_devid] = m + return res + + def get_latest(self, daemon_type, daemon_name, stat): + data = self.get_counter(daemon_type, daemon_name, stat)[stat] + #self.log.error("get_latest {0} data={1}".format(stat, data)) + if data: + return data[-1][1] + else: + return 0 + + def compile_report(self, channels=[]): + if not channels: + channels = self.get_active_channels() + report = { + 'leaderboard': self.leaderboard, + 'report_version': 1, + 'report_timestamp': datetime.utcnow().isoformat(), + 'report_id': self.report_id, + 'channels': channels, + 'channels_available': ALL_CHANNELS, + 'license': LICENSE, + } + + if 'ident' in channels: + for option in ['description', 'contact', 'organization']: + report[option] = getattr(self, option) + + if 'basic' in channels: + mon_map = self.get('mon_map') + osd_map = self.get('osd_map') + service_map = self.get('service_map') + fs_map = self.get('fs_map') + df = self.get('df') + + report['created'] = mon_map['created'] + + # mons + v1_mons = 0 + v2_mons = 0 + ipv4_mons = 0 + ipv6_mons = 0 + for mon in mon_map['mons']: + for a in mon['public_addrs']['addrvec']: + if a['type'] == 'v2': + v2_mons += 1 + elif a['type'] == 'v1': + v1_mons += 1 + if a['addr'].startswith('['): + ipv6_mons += 1 + else: + ipv4_mons += 1 + report['mon'] = { + 'count': len(mon_map['mons']), + 'features': mon_map['features'], + 'min_mon_release': mon_map['min_mon_release'], + 'v1_addr_mons': v1_mons, + 'v2_addr_mons': v2_mons, + 'ipv4_addr_mons': ipv4_mons, + 'ipv6_addr_mons': ipv6_mons, + } + + report['config'] = self.gather_configs() + + # pools + report['rbd'] = { + 'num_pools': 0, + 'num_images_by_pool': [], + 'mirroring_by_pool': [], + } + num_pg = 0 + report['pools'] = list() + for pool in osd_map['pools']: + num_pg += pool['pg_num'] + ec_profile = {} + if pool['erasure_code_profile']: + orig = osd_map['erasure_code_profiles'].get( + pool['erasure_code_profile'], {}) + ec_profile = { + k: orig[k] for k in orig.keys() + if k in ['k', 'm', 'plugin', 'technique', + 'crush-failure-domain', 'l'] + } + report['pools'].append( + { + 'pool': pool['pool'], + 'type': pool['type'], + 'pg_num': pool['pg_num'], + 'pgp_num': pool['pg_placement_num'], + 'size': pool['size'], + 'min_size': pool['min_size'], + 'pg_autoscale_mode': pool['pg_autoscale_mode'], + 'target_max_bytes': pool['target_max_bytes'], + 'target_max_objects': pool['target_max_objects'], + 'type': ['', 'replicated', '', 'erasure'][pool['type']], + 'erasure_code_profile': ec_profile, + 'cache_mode': pool['cache_mode'], + } + ) + if 'rbd' in pool['application_metadata']: + report['rbd']['num_pools'] += 1 + ioctx = self.rados.open_ioctx(pool['pool_name']) + report['rbd']['num_images_by_pool'].append( + sum(1 for _ in rbd.RBD().list2(ioctx))) + report['rbd']['mirroring_by_pool'].append( + rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED) + + # osds + cluster_network = False + for osd in osd_map['osds']: + if osd['up'] and not cluster_network: + front_ip = osd['public_addrs']['addrvec'][0]['addr'].split(':')[0] + back_ip = osd['cluster_addrs']['addrvec'][0]['addr'].split(':')[0] + if front_ip != back_ip: + cluster_network = True + report['osd'] = { + 'count': len(osd_map['osds']), + 'require_osd_release': osd_map['require_osd_release'], + 'require_min_compat_client': osd_map['require_min_compat_client'], + 'cluster_network': cluster_network, + } + + # crush + report['crush'] = self.gather_crush_info() + + # cephfs + report['fs'] = { + 'count': len(fs_map['filesystems']), + 'feature_flags': fs_map['feature_flags'], + 'num_standby_mds': len(fs_map['standbys']), + 'filesystems': [], + } + num_mds = len(fs_map['standbys']) + for fsm in fs_map['filesystems']: + fs = fsm['mdsmap'] + num_sessions = 0 + cached_ino = 0 + cached_dn = 0 + cached_cap = 0 + subtrees = 0 + rfiles = 0 + rbytes = 0 + rsnaps = 0 + for gid, mds in fs['info'].items(): + num_sessions += self.get_latest('mds', mds['name'], + 'mds_sessions.session_count') + cached_ino += self.get_latest('mds', mds['name'], + 'mds_mem.ino') + cached_dn += self.get_latest('mds', mds['name'], + 'mds_mem.dn') + cached_cap += self.get_latest('mds', mds['name'], + 'mds_mem.cap') + subtrees += self.get_latest('mds', mds['name'], + 'mds.subtrees') + if mds['rank'] == 0: + rfiles = self.get_latest('mds', mds['name'], + 'mds.root_rfiles') + rbytes = self.get_latest('mds', mds['name'], + 'mds.root_rbytes') + rsnaps = self.get_latest('mds', mds['name'], + 'mds.root_rsnaps') + report['fs']['filesystems'].append({ + 'max_mds': fs['max_mds'], + 'ever_allowed_features': fs['ever_allowed_features'], + 'explicitly_allowed_features': fs['explicitly_allowed_features'], + 'num_in': len(fs['in']), + 'num_up': len(fs['up']), + 'num_standby_replay': len( + [mds for gid, mds in fs['info'].items() + if mds['state'] == 'up:standby-replay']), + 'num_mds': len(fs['info']), + 'num_sessions': num_sessions, + 'cached_inos': cached_ino, + 'cached_dns': cached_dn, + 'cached_caps': cached_cap, + 'cached_subtrees': subtrees, + 'balancer_enabled': len(fs['balancer']) > 0, + 'num_data_pools': len(fs['data_pools']), + 'standby_count_wanted': fs['standby_count_wanted'], + 'approx_ctime': fs['created'][0:7], + 'files': rfiles, + 'bytes': rbytes, + 'snaps': rsnaps, + }) + num_mds += len(fs['info']) + report['fs']['total_num_mds'] = num_mds + + # daemons + report['metadata'] = dict() + report['metadata']['osd'] = self.gather_osd_metadata(osd_map) + report['metadata']['mon'] = self.gather_mon_metadata(mon_map) + + # host counts + servers = self.list_servers() + self.log.debug('servers %s' % servers) + report['hosts'] = { + 'num': len([h for h in servers if h['hostname']]), + } + for t in ['mon', 'mds', 'osd', 'mgr']: + report['hosts']['num_with_' + t] = len( + [h for h in servers + if len([s for s in h['services'] if s['type'] == t])] + ) + + report['usage'] = { + 'pools': len(df['pools']), + 'pg_num': num_pg, + 'total_used_bytes': df['stats']['total_used_bytes'], + 'total_bytes': df['stats']['total_bytes'], + 'total_avail_bytes': df['stats']['total_avail_bytes'] + } + + report['services'] = defaultdict(int) + for key, value in service_map['services'].items(): + report['services'][key] += 1 + if key == 'rgw': + report['rgw'] = { + 'count': 0, + } + zones = set() + realms = set() + zonegroups = set() + frontends = set() + d = value.get('daemons', dict()) + + for k,v in d.items(): + if k == 'summary' and v: + report['rgw'][k] = v + elif isinstance(v, dict) and 'metadata' in v: + report['rgw']['count'] += 1 + zones.add(v['metadata']['zone_id']) + zonegroups.add(v['metadata']['zonegroup_id']) + frontends.add(v['metadata']['frontend_type#0']) + + # we could actually iterate over all the keys of + # the dict and check for how many frontends there + # are, but it is unlikely that one would be running + # more than 2 supported ones + f2 = v['metadata'].get('frontend_type#1', None) + if f2: + frontends.add(f2) + + report['rgw']['zones'] = len(zones) + report['rgw']['zonegroups'] = len(zonegroups) + report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable + + try: + report['balancer'] = self.remote('balancer', 'gather_telemetry') + except ImportError: + report['balancer'] = { + 'active': False + } + + if 'crash' in channels: + report['crashes'] = self.gather_crashinfo() + + # NOTE: We do not include the 'device' channel in this report; it is + # sent to a different endpoint. + + return report + + def _try_post(self, what, url, report): + self.log.info('Sending %s to: %s' % (what, url)) + proxies = dict() + if self.proxy: + self.log.info('Send using HTTP(S) proxy: %s', self.proxy) + proxies['http'] = self.proxy + proxies['https'] = self.proxy + try: + resp = requests.put(url=url, json=report, proxies=proxies) + resp.raise_for_status() + except Exception as e: + fail_reason = 'Failed to send %s to %s: %s' % (what, url, str(e)) + self.log.error(fail_reason) + return fail_reason + return None + + def send(self, report, endpoint=None): + if not endpoint: + endpoint = ['ceph', 'device'] + failed = [] + success = [] + self.log.debug('Send endpoints %s' % endpoint) + for e in endpoint: + if e == 'ceph': + fail_reason = self._try_post('ceph report', self.url, report) + if fail_reason: + failed.append(fail_reason) + else: + now = int(time.time()) + self.last_upload = now + self.set_store('last_upload', str(now)) + success.append('Ceph report sent to {0}'.format(self.url)) + self.log.info('Sent report to {0}'.format(self.url)) + elif e == 'device': + if 'device' in self.get_active_channels(): + devices = self.gather_device_report() + num_devs = 0 + num_hosts = 0 + for host, ls in devices.items(): + self.log.debug('host %s devices %s' % (host, ls)) + if not len(ls): + continue + fail_reason = self._try_post('devices', self.device_url, + ls) + if fail_reason: + failed.append(fail_reason) + else: + num_devs += len(ls) + num_hosts += 1 + if num_devs: + success.append('Reported %d devices across %d hosts' % ( + num_devs, len(devices))) + if failed: + return 1, '', '\n'.join(success + failed) + return 0, '', '\n'.join(success) + + def handle_command(self, inbuf, command): + if command['prefix'] == 'telemetry status': + r = {} + for opt in self.MODULE_OPTIONS: + r[opt['name']] = getattr(self, opt['name']) + r['last_upload'] = time.ctime(self.last_upload) if self.last_upload else self.last_upload + return 0, json.dumps(r, indent=4, sort_keys=True), '' + elif command['prefix'] == 'telemetry on': + if command.get('license') != LICENSE: + return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo enable, add '--license " + LICENSE + "' to the 'ceph telemetry on' command." + self.on() + return 0, '', '' + elif command['prefix'] == 'telemetry off': + self.off() + return 0, '', '' + elif command['prefix'] == 'telemetry send': + if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and command.get('license') != LICENSE: + self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement') + return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'." + self.last_report = self.compile_report() + return self.send(self.last_report, command.get('endpoint')) + + elif command['prefix'] == 'telemetry show': + report = self.get_report(channels=command.get('channels', None)) + report = json.dumps(report, indent=4, sort_keys=True) + if self.channel_device: + report += '\n \nDevice report is generated separately. To see it run \'ceph telemetry show-device\'.' + return 0, report, '' + elif command['prefix'] == 'telemetry show-device': + return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), '' + elif command['prefix'] == 'telemetry show-all': + return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), '' + else: + return (-errno.EINVAL, '', + "Command not found '{0}'".format(command['prefix'])) + + def on(self): + self.set_module_option('enabled', True) + self.set_module_option('last_opt_revision', REVISION) + + # wake up serve() to reset health warning + self.event.set() + + def off(self): + self.set_module_option('enabled', False) + self.set_module_option('last_opt_revision', 1) + + def get_report(self, report_type='default', channels=None): + if report_type == 'default': + return self.compile_report(channels=channels) + elif report_type == 'device': + return self.gather_device_report() + elif report_type == 'all': + return {'report': self.compile_report(channels=channels), + 'device_report': self.gather_device_report()} + return {} + + def self_test(self): + report = self.compile_report() + if len(report) == 0: + raise RuntimeError('Report is empty') + + if 'report_id' not in report: + raise RuntimeError('report_id not found in report') + + def shutdown(self): + self.run = False + self.event.set() + + def refresh_health_checks(self): + health_checks = {} + if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN: + health_checks['TELEMETRY_CHANGED'] = { + 'severity': 'warning', + 'summary': 'Telemetry requires re-opt-in', + 'detail': [ + 'telemetry report includes new information; must re-opt-in (or out)' + ] + } + self.set_health_checks(health_checks) + + def serve(self): + self.load() + self.run = True + + self.log.debug('Waiting for mgr to warm up') + time.sleep(10) + + while self.run: + self.event.clear() + + self.refresh_health_checks() + + if self.last_opt_revision < LAST_REVISION_RE_OPT_IN: + self.log.debug('Not sending report until user re-opts-in') + self.event.wait(1800) + continue + if not self.enabled: + self.log.debug('Not sending report until configured to do so') + self.event.wait(1800) + continue + + now = int(time.time()) + if not self.last_upload or (now - self.last_upload) > \ + self.interval * 3600: + self.log.info('Compiling and sending report to %s', + self.url) + + try: + self.last_report = self.compile_report() + except: + self.log.exception('Exception while compiling report:') + + self.send(self.last_report) + else: + self.log.debug('Interval for sending new report has not expired') + + sleep = 3600 + self.log.debug('Sleeping for %d seconds', sleep) + self.event.wait(sleep) + + def self_test(self): + self.compile_report() + return True + + @staticmethod + def can_run(): + return True, '' |