import cherrypy from distutils.version import StrictVersion import json import errno import math import os import re import socket import threading import time from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES from mgr_util import get_default_addr from rbd import RBD from collections import namedtuple try: from typing import Optional except: pass # Defaults for the Prometheus HTTP server. Can also set in config-key # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations # for Prometheus exporter port registry DEFAULT_PORT = 9283 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify # that the ports its listening on are in fact bound. When using the any address # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes) # ipv6 isn't yet configured / supported and CherryPy throws an uncaught # exception. if cherrypy is not None: v = StrictVersion(cherrypy.__version__) # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on # centos:7) and back to at least 3.0.0. if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"): # https://github.com/cherrypy/cherrypy/issues/1100 from cherrypy.process import servers servers.wait_for_occupied_port = lambda host, port: None # cherrypy likes to sys.exit on error. don't let it take us down too! def os_exit_noop(*args, **kwargs): pass os._exit = os_exit_noop # to access things in class Module from subclass Root. Because # it's a dict, the writer doesn't need to declare 'global' for access _global_instance = None # type: Optional[Module] cherrypy.config.update({ 'response.headers.server': 'Ceph-Prometheus' }) def health_status_to_number(status): if status == 'HEALTH_OK': return 0 elif status == 'HEALTH_WARN': return 1 elif status == 'HEALTH_ERR': return 2 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes'] DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty', 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes', 'compress_bytes_used', 'compress_under_bytes'] OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec', 'recovering_keys_per_sec', 'num_objects_recovered', 'num_bytes_recovered', 'num_bytes_recovered') OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance', 'norecover', 'noscrub', 'nodeep-scrub') FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name') MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank', 'ceph_version') MON_METADATA = ('ceph_daemon', 'hostname', 'public_addr', 'rank', 'ceph_version') MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version') MGR_STATUS = ('ceph_daemon',) MGR_MODULE_STATUS = ('name',) MGR_MODULE_CAN_RUN = ('name',) OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class', 'front_iface', 'hostname', 'objectstore', 'public_addr', 'ceph_version') OSD_STATUS = ['weight', 'up', 'in'] OSD_STATS = ['apply_latency_ms', 'commit_latency_ms'] POOL_METADATA = ('pool_id', 'name') RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version') RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname', 'ceph_version') DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device', 'wal_device', 'instance') NUM_OBJECTS = ['degraded', 'misplaced', 'unfound'] alert_metric = namedtuple('alert_metric', 'name description') HEALTH_CHECKS = [ alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ), ] class Metric(object): def __init__(self, mtype, name, desc, labels=None): self.mtype = mtype self.name = name self.desc = desc self.labelnames = labels # tuple if present self.value = {} # indexed by label values def clear(self): self.value = {} def set(self, value, labelvalues=None): # labelvalues must be a tuple labelvalues = labelvalues or ('',) self.value[labelvalues] = value def str_expfmt(self): def promethize(path): ''' replace illegal metric name characters ''' result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus') # Hyphens usually turn into underscores, unless they are # trailing if result.endswith("-"): result = result[0:-1] + "_minus" else: result = result.replace("-", "_") return "ceph_{0}".format(result) def floatstr(value): ''' represent as Go-compatible float ''' if value == float('inf'): return '+Inf' if value == float('-inf'): return '-Inf' if math.isnan(value): return 'NaN' return repr(float(value)) name = promethize(self.name) expfmt = ''' # HELP {name} {desc} # TYPE {name} {mtype}'''.format( name=name, desc=self.desc, mtype=self.mtype, ) for labelvalues, value in self.value.items(): if self.labelnames: labels = zip(self.labelnames, labelvalues) labels = ','.join('%s="%s"' % (k, v) for k, v in labels) else: labels = '' if labels: fmtstr = '\n{name}{{{labels}}} {value}' else: fmtstr = '\n{name} {value}' expfmt += fmtstr.format( name=name, labels=labels, value=floatstr(value), ) return expfmt class MetricCollectionThread(threading.Thread): def __init__(self, module): # type: (Module) -> None self.mod = module self.active = True self.event = threading.Event() super(MetricCollectionThread, self).__init__(target=self.collect) def collect(self): self.mod.log.info('starting metric collection thread') while self.active: self.mod.log.debug('collecting cache in thread') if self.mod.have_mon_connection(): start_time = time.time() try: data = self.mod.collect() except: # Log any issues encountered during the data collection and continue self.mod.log.exception("failed to collect metrics:") self.event.wait(self.mod.scrape_interval) continue duration = time.time() - start_time sleep_time = self.mod.scrape_interval - duration if sleep_time < 0: self.mod.log.warning( 'Collecting data took more time than configured scrape interval. ' 'This possibly results in stale data. Please check the ' '`stale_cache_strategy` configuration option. ' 'Collecting data took {:.2f} seconds but scrape interval is configured ' 'to be {:.0f} seconds.'.format( duration, self.mod.scrape_interval, ) ) sleep_time = 0 with self.mod.collect_lock: self.mod.collect_cache = data self.mod.collect_time = duration self.event.wait(sleep_time) else: self.mod.log.error('No MON connection') self.event.wait(self.mod.scrape_interval) def stop(self): self.active = False self.event.set() class Module(MgrModule): COMMANDS = [ { "cmd": "prometheus file_sd_config", "desc": "Return file_sd compatible prometheus config for mgr cluster", "perm": "r" }, ] MODULE_OPTIONS = [ {'name': 'server_addr'}, {'name': 'server_port'}, {'name': 'scrape_interval'}, {'name': 'stale_cache_strategy'}, {'name': 'rbd_stats_pools'}, {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300}, ] STALE_CACHE_FAIL = 'fail' STALE_CACHE_RETURN = 'return' def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self.metrics = self._setup_static_metrics() self.shutdown_event = threading.Event() self.collect_lock = threading.Lock() self.collect_time = 0 self.scrape_interval = 15.0 self.stale_cache_strategy = self.STALE_CACHE_FAIL self.collect_cache = None self.rbd_stats = { 'pools': {}, 'pools_refresh_time': 0, 'counters_info': { 'write_ops': {'type': self.PERFCOUNTER_COUNTER, 'desc': 'RBD image writes count'}, 'read_ops': {'type': self.PERFCOUNTER_COUNTER, 'desc': 'RBD image reads count'}, 'write_bytes': {'type': self.PERFCOUNTER_COUNTER, 'desc': 'RBD image bytes written'}, 'read_bytes': {'type': self.PERFCOUNTER_COUNTER, 'desc': 'RBD image bytes read'}, 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG, 'desc': 'RBD image writes latency (msec)'}, 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG, 'desc': 'RBD image reads latency (msec)'}, }, } global _global_instance _global_instance = self self.metrics_thread = MetricCollectionThread(_global_instance) def _setup_static_metrics(self): metrics = {} metrics['health_status'] = Metric( 'untyped', 'health_status', 'Cluster health status' ) metrics['mon_quorum_status'] = Metric( 'gauge', 'mon_quorum_status', 'Monitors in quorum', ('ceph_daemon',) ) metrics['fs_metadata'] = Metric( 'untyped', 'fs_metadata', 'FS Metadata', FS_METADATA ) metrics['mds_metadata'] = Metric( 'untyped', 'mds_metadata', 'MDS Metadata', MDS_METADATA ) metrics['mon_metadata'] = Metric( 'untyped', 'mon_metadata', 'MON Metadata', MON_METADATA ) metrics['mgr_metadata'] = Metric( 'gauge', 'mgr_metadata', 'MGR metadata', MGR_METADATA ) metrics['mgr_status'] = Metric( 'gauge', 'mgr_status', 'MGR status (0=standby, 1=active)', MGR_STATUS ) metrics['mgr_module_status'] = Metric( 'gauge', 'mgr_module_status', 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)', MGR_MODULE_STATUS ) metrics['mgr_module_can_run'] = Metric( 'gauge', 'mgr_module_can_run', 'MGR module runnable state i.e. can it run (0=no, 1=yes)', MGR_MODULE_CAN_RUN ) metrics['osd_metadata'] = Metric( 'untyped', 'osd_metadata', 'OSD Metadata', OSD_METADATA ) # The reason for having this separate to OSD_METADATA is # so that we can stably use the same tag names that # the Prometheus node_exporter does metrics['disk_occupation'] = Metric( 'untyped', 'disk_occupation', 'Associate Ceph daemon with disk used', DISK_OCCUPATION ) metrics['pool_metadata'] = Metric( 'untyped', 'pool_metadata', 'POOL Metadata', POOL_METADATA ) metrics['rgw_metadata'] = Metric( 'untyped', 'rgw_metadata', 'RGW Metadata', RGW_METADATA ) metrics['rbd_mirror_metadata'] = Metric( 'untyped', 'rbd_mirror_metadata', 'RBD Mirror Metadata', RBD_MIRROR_METADATA ) metrics['pg_total'] = Metric( 'gauge', 'pg_total', 'PG Total Count per Pool', ('pool_id',) ) metrics['scrape_duration_seconds'] = Metric( 'gauge', 'scrape_duration_secs', 'Time taken to gather metrics from Ceph (secs)' ) for flag in OSD_FLAGS: path = 'osd_flag_{}'.format(flag) metrics[path] = Metric( 'untyped', path, 'OSD Flag {}'.format(flag) ) for state in OSD_STATUS: path = 'osd_{}'.format(state) metrics[path] = Metric( 'untyped', path, 'OSD status {}'.format(state), ('ceph_daemon',) ) for stat in OSD_STATS: path = 'osd_{}'.format(stat) metrics[path] = Metric( 'gauge', path, 'OSD stat {}'.format(stat), ('ceph_daemon',) ) for stat in OSD_POOL_STATS: path = 'pool_{}'.format(stat) metrics[path] = Metric( 'gauge', path, "OSD POOL STATS: {}".format(stat), ('pool_id',) ) for state in PG_STATES: path = 'pg_{}'.format(state) metrics[path] = Metric( 'gauge', path, 'PG {} per pool'.format(state), ('pool_id',) ) for state in DF_CLUSTER: path = 'cluster_{}'.format(state) metrics[path] = Metric( 'gauge', path, 'DF {}'.format(state), ) for state in DF_POOL: path = 'pool_{}'.format(state) metrics[path] = Metric( 'gauge', path, 'DF pool {}'.format(state), ('pool_id',) ) for state in NUM_OBJECTS: path = 'num_objects_{}'.format(state) metrics[path] = Metric( 'gauge', path, 'Number of {} objects'.format(state), ) for check in HEALTH_CHECKS: path = 'healthcheck_{}'.format(check.name.lower()) metrics[path] = Metric( 'gauge', path, check.description, ) return metrics def get_health(self): def _get_value(message, delim=' ', word_pos=0): """Extract value from message (default is 1st field)""" v_str = message.split(delim)[word_pos] if v_str.isdigit(): return int(v_str), 0 return 0, 1 health = json.loads(self.get('health')['json']) # set overall health self.metrics['health_status'].set( health_status_to_number(health['status']) ) # Examine the health to see if any health checks triggered need to # become a metric. active_healthchecks = health.get('checks', {}) active_names = active_healthchecks.keys() for check in HEALTH_CHECKS: path = 'healthcheck_{}'.format(check.name.lower()) if path in self.metrics: if check.name in active_names: check_data = active_healthchecks[check.name] message = check_data['summary'].get('message', '') v, err = 0, 0 if check.name == "SLOW_OPS": # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops. v, err = _get_value(message) if err: self.log.error("healthcheck {} message format is incompatible and has been dropped".format(check.name)) # drop the metric, so it's no longer emitted del self.metrics[path] continue else: self.metrics[path].set(v) else: # health check is not active, so give it a default of 0 self.metrics[path].set(0) def get_pool_stats(self): # retrieve pool stats to provide per pool recovery metrics # (osd_pool_stats moved to mgr in Mimic) pstats = self.get('osd_pool_stats') for pool in pstats['pool_stats']: for stat in OSD_POOL_STATS: self.metrics['pool_{}'.format(stat)].set( pool['recovery_rate'].get(stat, 0), (pool['pool_id'],) ) def get_df(self): # maybe get the to-be-exported metrics from a config? df = self.get('df') for stat in DF_CLUSTER: self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat]) for pool in df['pools']: for stat in DF_POOL: self.metrics['pool_{}'.format(stat)].set( pool['stats'][stat], (pool['id'],) ) def get_fs(self): fs_map = self.get('fs_map') servers = self.get_service_list() active_daemons = [] for fs in fs_map['filesystems']: # collect fs metadata data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']]) self.metrics['fs_metadata'].set(1, ( data_pools, fs['id'], fs['mdsmap']['metadata_pool'], fs['mdsmap']['fs_name'] )) self.log.debug('mdsmap: {}'.format(fs['mdsmap'])) for gid, daemon in fs['mdsmap']['info'].items(): id_ = daemon['name'] host_version = servers.get((id_, 'mds'), ('', '')) self.metrics['mds_metadata'].set(1, ( 'mds.{}'.format(id_), fs['id'], host_version[0], daemon['addr'], daemon['rank'], host_version[1] )) def get_quorum_status(self): mon_status = json.loads(self.get('mon_status')['json']) servers = self.get_service_list() for mon in mon_status['monmap']['mons']: rank = mon['rank'] id_ = mon['name'] host_version = servers.get((id_, 'mon'), ('', '')) self.metrics['mon_metadata'].set(1, ( 'mon.{}'.format(id_), host_version[0], mon['public_addr'].rsplit(':', 1)[0], rank, host_version[1] )) in_quorum = int(rank in mon_status['quorum']) self.metrics['mon_quorum_status'].set(in_quorum, ( 'mon.{}'.format(id_), )) def get_mgr_status(self): mgr_map = self.get('mgr_map') servers = self.get_service_list() active = mgr_map['active_name'] standbys = [s.get('name') for s in mgr_map['standbys']] all_mgrs = list(standbys) all_mgrs.append(active) all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']} for mgr in all_mgrs: host_version = servers.get((mgr, 'mgr'), ('', '')) if mgr == active: _state = 1 else: _state = 0 self.metrics['mgr_metadata'].set(1, ( 'mgr.{}'.format(mgr), host_version[0], host_version[1] )) self.metrics['mgr_status'].set(_state, ( 'mgr.{}'.format(mgr), )) always_on_modules = mgr_map['always_on_modules'].get(self.release_name, []) active_modules = list(always_on_modules) active_modules.extend(mgr_map['modules']) for mod_name in all_modules.keys(): if mod_name in always_on_modules: _state = 2 elif mod_name in active_modules: _state = 1 else: _state = 0 _can_run = 1 if all_modules[mod_name] else 0 self.metrics['mgr_module_status'].set(_state, (mod_name,)) self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,)) def get_pg_status(self): pg_summary = self.get('pg_summary') for pool in pg_summary['by_pool']: num_by_state = dict((state, 0) for state in PG_STATES) num_by_state['total'] = 0 for state_name, count in pg_summary['by_pool'][pool].items(): for state in state_name.split('+'): num_by_state[state] += count num_by_state['total'] += count for state, num in num_by_state.items(): try: self.metrics["pg_{}".format(state)].set(num, (pool,)) except KeyError: self.log.warn("skipping pg in unknown state {}".format(state)) def get_osd_stats(self): osd_stats = self.get('osd_stats') for osd in osd_stats['osd_stats']: id_ = osd['osd'] for stat in OSD_STATS: val = osd['perf_stat'][stat] self.metrics['osd_{}'.format(stat)].set(val, ( 'osd.{}'.format(id_), )) def get_service_list(self): ret = {} for server in self.list_servers(): version = server.get('ceph_version', '') host = server.get('hostname', '') for service in server.get('services', []): ret.update({(service['id'], service['type']): (host, version)}) return ret def get_metadata_and_osd_status(self): osd_map = self.get('osd_map') osd_flags = osd_map['flags'].split(',') for flag in OSD_FLAGS: self.metrics['osd_flag_{}'.format(flag)].set( int(flag in osd_flags) ) osd_devices = self.get('osd_map_crush')['devices'] servers = self.get_service_list() for osd in osd_map['osds']: # id can be used to link osd metrics and metadata id_ = osd['osd'] # collect osd metadata p_addr = osd['public_addr'].rsplit(':', 1)[0] c_addr = osd['cluster_addr'].rsplit(':', 1)[0] if p_addr == "-" or c_addr == "-": self.log.info( "Missing address metadata for osd {0}, skipping occupation" " and metadata records for this osd".format(id_) ) continue dev_class = None for osd_device in osd_devices: if osd_device['id'] == id_: dev_class = osd_device.get('class', '') break if dev_class is None: self.log.info( "OSD {0} is missing from CRUSH map, skipping output".format( id_)) continue host_version = servers.get((str(id_), 'osd'), ('', '')) # collect disk occupation metadata osd_metadata = self.get_metadata("osd", str(id_)) if osd_metadata is None: continue obj_store = osd_metadata.get('osd_objectstore', '') f_iface = osd_metadata.get('front_iface', '') b_iface = osd_metadata.get('back_iface', '') self.metrics['osd_metadata'].set(1, ( b_iface, 'osd.{}'.format(id_), c_addr, dev_class, f_iface, host_version[0], obj_store, p_addr, host_version[1] )) # collect osd status for state in OSD_STATUS: status = osd[state] self.metrics['osd_{}'.format(state)].set(status, ( 'osd.{}'.format(id_), )) osd_dev_node = None if obj_store == "filestore": # collect filestore backend device osd_dev_node = osd_metadata.get( 'backend_filestore_dev_node', None) # collect filestore journal device osd_wal_dev_node = osd_metadata.get('osd_journal', '') osd_db_dev_node = '' elif obj_store == "bluestore": # collect bluestore backend device osd_dev_node = osd_metadata.get( 'bluestore_bdev_dev_node', None) # collect bluestore wal backend osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '') # collect bluestore db backend osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '') if osd_dev_node and osd_dev_node == "unknown": osd_dev_node = None osd_hostname = osd_metadata.get('hostname', None) if osd_dev_node and osd_hostname: self.log.debug("Got dev for osd {0}: {1}/{2}".format( id_, osd_hostname, osd_dev_node)) self.metrics['disk_occupation'].set(1, ( "osd.{0}".format(id_), osd_dev_node, osd_db_dev_node, osd_wal_dev_node, osd_hostname )) else: self.log.info("Missing dev node metadata for osd {0}, skipping " "occupation record for this osd".format(id_)) pool_meta = [] for pool in osd_map['pools']: self.metrics['pool_metadata'].set( 1, (pool['pool'], pool['pool_name'])) # Populate other servers metadata for key, value in servers.items(): service_id, service_type = key if service_type == 'rgw': hostname, version = value self.metrics['rgw_metadata'].set( 1, ('{}.{}'.format(service_type, service_id), hostname, version) ) elif service_type == 'rbd-mirror': mirror_metadata = self.get_metadata('rbd-mirror', service_id) if mirror_metadata is None: continue mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type, service_id) self.metrics['rbd_mirror_metadata'].set( 1, (mirror_metadata.get(k, '') for k in RBD_MIRROR_METADATA) ) def get_num_objects(self): pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum'] for obj in NUM_OBJECTS: stat = 'num_objects_{}'.format(obj) self.metrics[stat].set(pg_sum[stat]) def get_rbd_stats(self): # Per RBD image stats is collected by registering a dynamic osd perf # stats query that tells OSDs to group stats for requests associated # with RBD objects by pool, namespace, and image id, which are # extracted from the request object names or other attributes. # The RBD object names have the following prefixes: # - rbd_data.{image_id}. (data stored in the same pool as metadata) # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool) # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled) # The pool_id in the object name is the id of the pool with the image # metdata, and should be used in the image spec. If there is no pool_id # in the object name, the image pool is the pool where the object is # located. # Parse rbd_stats_pools option, which is a comma or space separated # list of pool[/namespace] entries. If no namespace is specifed the # stats are collected for every namespace in the pool. The wildcard # '*' can be used to indicate all pools or namespaces pools_string = self.get_localized_module_option('rbd_stats_pools', '') pool_keys = [] for x in re.split('[\s,]+', pools_string): if not x: continue s = x.split('/', 2) pool_name = s[0] namespace_name = None if len(s) == 2: namespace_name = s[1] if pool_name == "*": # collect for all pools osd_map = self.get('osd_map') for pool in osd_map['pools']: if 'rbd' not in pool.get('application_metadata', {}): continue pool_keys.append((pool['pool_name'], namespace_name)) else: pool_keys.append((pool_name, namespace_name)) pools = {} for pool_key in pool_keys: pool_name = pool_key[0] namespace_name = pool_key[1] if not namespace_name or namespace_name == "*": # empty set means collect for all namespaces pools[pool_name] = set() continue if pool_name not in pools: pools[pool_name] = set() elif not pools[pool_name]: continue pools[pool_name].add(namespace_name) rbd_stats_pools = {} for pool_id in list(self.rbd_stats['pools']): name = self.rbd_stats['pools'][pool_id]['name'] if name not in pools: del self.rbd_stats['pools'][pool_id] else: rbd_stats_pools[name] = \ self.rbd_stats['pools'][pool_id]['ns_names'] pools_refreshed = False if pools: next_refresh = self.rbd_stats['pools_refresh_time'] + \ self.get_localized_module_option( 'rbd_stats_pools_refresh_interval', 300) if rbd_stats_pools != pools or time.time() >= next_refresh: self.refresh_rbd_stats_pools(pools) pools_refreshed = True pool_ids = list(self.rbd_stats['pools']) pool_ids.sort() pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$' nspace_names = [] for pool_id, pool in self.rbd_stats['pools'].items(): if pool['ns_names']: nspace_names.extend(pool['ns_names']) else: nspace_names = [] break if nspace_names: namespace_regex = '^(' + \ "|".join([re.escape(x) for x in set(nspace_names)]) + ')$' else: namespace_regex = '^(.*)$' if 'query' in self.rbd_stats and \ (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']): self.remove_osd_perf_query(self.rbd_stats['query_id']) del self.rbd_stats['query_id'] del self.rbd_stats['query'] if not self.rbd_stats['pools']: return counters_info = self.rbd_stats['counters_info'] if 'query_id' not in self.rbd_stats: query = { 'key_descriptor': [ {'type': 'pool_id', 'regex': pool_id_regex}, {'type': 'namespace', 'regex': namespace_regex}, {'type': 'object_name', 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'}, ], 'performance_counter_descriptors': list(counters_info), } query_id = self.add_osd_perf_query(query) if query_id is None: self.log.error('failed to add query %s' % query) return self.rbd_stats['query'] = query self.rbd_stats['query_id'] = query_id res = self.get_osd_perf_counters(self.rbd_stats['query_id']) for c in res['counters']: # if the pool id is not found in the object name use id of the # pool where the object is located if c['k'][2][0]: pool_id = int(c['k'][2][0]) else: pool_id = int(c['k'][0][0]) if pool_id not in self.rbd_stats['pools'] and not pools_refreshed: self.refresh_rbd_stats_pools(pools) pools_refreshed = True if pool_id not in self.rbd_stats['pools']: continue pool = self.rbd_stats['pools'][pool_id] nspace_name = c['k'][1][0] if nspace_name not in pool['images']: continue image_id = c['k'][2][1] if image_id not in pool['images'][nspace_name] and \ not pools_refreshed: self.refresh_rbd_stats_pools(pools) pool = self.rbd_stats['pools'][pool_id] pools_refreshed = True if image_id not in pool['images'][nspace_name]: continue counters = pool['images'][nspace_name][image_id]['c'] for i in range(len(c['c'])): counters[i][0] += c['c'][i][0] counters[i][1] += c['c'][i][1] label_names = ("pool", "namespace", "image") for pool_id, pool in self.rbd_stats['pools'].items(): pool_name = pool['name'] for nspace_name, images in pool['images'].items(): for image_id in images: image_name = images[image_id]['n'] counters = images[image_id]['c'] i = 0 for key in counters_info: counter_info = counters_info[key] stattype = self._stattype_to_str(counter_info['type']) labels = (pool_name, nspace_name, image_name) if counter_info['type'] == self.PERFCOUNTER_COUNTER: path = 'rbd_' + key if path not in self.metrics: self.metrics[path] = Metric( stattype, path, counter_info['desc'], label_names, ) self.metrics[path].set(counters[i][0], labels) elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG: path = 'rbd_' + key + '_sum' if path not in self.metrics: self.metrics[path] = Metric( stattype, path, counter_info['desc'] + ' Total', label_names, ) self.metrics[path].set(counters[i][0], labels) path = 'rbd_' + key + '_count' if path not in self.metrics: self.metrics[path] = Metric( 'counter', path, counter_info['desc'] + ' Count', label_names, ) self.metrics[path].set(counters[i][1], labels) i += 1 def refresh_rbd_stats_pools(self, pools): self.log.debug('refreshing rbd pools %s' % (pools)) rbd = RBD() counters_info = self.rbd_stats['counters_info'] for pool_name, cfg_ns_names in pools.items(): try: pool_id = self.rados.pool_lookup(pool_name) with self.rados.open_ioctx(pool_name) as ioctx: if pool_id not in self.rbd_stats['pools']: self.rbd_stats['pools'][pool_id] = {'images': {}} pool = self.rbd_stats['pools'][pool_id] pool['name'] = pool_name pool['ns_names'] = cfg_ns_names if cfg_ns_names: nspace_names = list(cfg_ns_names) else: nspace_names = [''] + rbd.namespace_list(ioctx) for nspace_name in pool['images']: if nspace_name not in nspace_names: del pool['images'][nspace_name] for nspace_name in nspace_names: if (nspace_name and not rbd.namespace_exists(ioctx, nspace_name)): self.log.debug('unknown namespace %s for pool %s' % (nspace_name, pool_name)) continue ioctx.set_namespace(nspace_name) if nspace_name not in pool['images']: pool['images'][nspace_name] = {} namespace = pool['images'][nspace_name] images = {} for image_meta in RBD().list2(ioctx): image = {'n': image_meta['name']} image_id = image_meta['id'] if image_id in namespace: image['c'] = namespace[image_id]['c'] else: image['c'] = [[0, 0] for x in counters_info] images[image_id] = image pool['images'][nspace_name] = images except Exception as e: self.log.error('failed listing pool %s: %s' % (pool_name, e)) self.rbd_stats['pools_refresh_time'] = time.time() def shutdown_rbd_stats(self): if 'query_id' in self.rbd_stats: self.remove_osd_perf_query(self.rbd_stats['query_id']) del self.rbd_stats['query_id'] del self.rbd_stats['query'] self.rbd_stats['pools'].clear() def collect(self): # Clear the metrics before scraping for k in self.metrics.keys(): self.metrics[k].clear() _start_time = time.time() self.get_health() self.get_df() self.get_pool_stats() self.get_fs() self.get_osd_stats() self.get_quorum_status() self.get_mgr_status() self.get_metadata_and_osd_status() self.get_pg_status() self.get_num_objects() for daemon, counters in self.get_all_perf_counters().items(): for path, counter_info in counters.items(): # Skip histograms, they are represented by long running avgs stattype = self._stattype_to_str(counter_info['type']) if not stattype or stattype == 'histogram': self.log.debug('ignoring %s, type %s' % (path, stattype)) continue path, label_names, labels = self._perfpath_to_path_labels( daemon, path) # Get the value of the counter value = self._perfvalue_to_value( counter_info['type'], counter_info['value']) # Represent the long running avgs as sum/count pairs if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG: _path = path + '_sum' if _path not in self.metrics: self.metrics[_path] = Metric( stattype, _path, counter_info['description'] + ' Total', label_names, ) self.metrics[_path].set(value, labels) _path = path + '_count' if _path not in self.metrics: self.metrics[_path] = Metric( 'counter', _path, counter_info['description'] + ' Count', label_names, ) self.metrics[_path].set(counter_info['count'], labels,) else: if path not in self.metrics: self.metrics[path] = Metric( stattype, path, counter_info['description'], label_names, ) self.metrics[path].set(value, labels) self.get_rbd_stats() _end_time = time.time() self.metrics['scrape_duration_seconds'].set(_end_time - _start_time) # Return formatted metrics and clear no longer used data _metrics = [m.str_expfmt() for m in self.metrics.values()] for k in self.metrics.keys(): self.metrics[k].clear() return ''.join(_metrics) + '\n' def get_file_sd_config(self): servers = self.list_servers() targets = [] for server in servers: hostname = server.get('hostname', '') for service in server.get('services', []): if service['type'] != 'mgr': continue id_ = service['id'] # get port for prometheus module at mgr with id_ # TODO use get_config_prefix or get_config here once # https://github.com/ceph/ceph/pull/20458 is merged result = CommandResult("") _global_instance.send_command( result, "mon", '', json.dumps({ "prefix": "config-key get", 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_), }), "") r, outb, outs = result.wait() if r != 0: _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs)) targets.append('{}:{}'.format(hostname, DEFAULT_PORT)) else: port = json.loads(outb) targets.append('{}:{}'.format(hostname, port)) ret = [ { "targets": targets, "labels": {} } ] return 0, json.dumps(ret), "" def self_test(self): self.collect() self.get_file_sd_config() def handle_command(self, inbuf, cmd): if cmd['prefix'] == 'prometheus file_sd_config': return self.get_file_sd_config() else: return (-errno.EINVAL, '', "Command not found '{0}'".format(cmd['prefix'])) def serve(self): class Root(object): # collapse everything to '/' def _cp_dispatch(self, vpath): cherrypy.request.path = '' return self @cherrypy.expose def index(self): return ''' Ceph Exporter

Ceph Exporter

Metrics

''' @cherrypy.expose def metrics(self): # Lock the function execution with _global_instance.collect_lock: return self._metrics(_global_instance) @staticmethod def _metrics(instance): # Return cached data if available if not instance.collect_cache: raise cherrypy.HTTPError(503, 'No cached data available yet') def respond(): cherrypy.response.headers['Content-Type'] = 'text/plain' return instance.collect_cache if instance.collect_time < instance.scrape_interval: # Respond if cache isn't stale return respond() if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN: # Respond even if cache is stale instance.log.info( 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, ' 'returning metrics from stale cache.'.format( instance.collect_time, instance.collect_time - instance.scrape_interval ) ) return respond() if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL: # Fail if cache is stale msg = ( 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, ' 'returning "service unavailable".'.format( instance.collect_time, instance.collect_time - instance.scrape_interval, ) ) instance.log.error(msg) raise cherrypy.HTTPError(503, msg) # Make the cache timeout for collecting configurable self.scrape_interval = float(self.get_localized_module_option('scrape_interval', 15.0)) self.stale_cache_strategy = self.get_localized_module_option('stale_cache_strategy', 'log') if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL, self.STALE_CACHE_RETURN]: self.stale_cache_strategy = self.STALE_CACHE_FAIL server_addr = self.get_localized_module_option( 'server_addr', get_default_addr()) server_port = self.get_localized_module_option( 'server_port', DEFAULT_PORT) self.log.info( "server_addr: %s server_port: %s" % (server_addr, server_port) ) self.metrics_thread.start() # Publish the URI that others may use to access the service we're # about to start serving self.set_uri('http://{0}:{1}/'.format( socket.getfqdn() if server_addr in ['::', '0.0.0.0'] else server_addr, server_port )) cherrypy.config.update({ 'server.socket_host': server_addr, 'server.socket_port': int(server_port), 'engine.autoreload.on': False }) cherrypy.tree.mount(Root(), "/") self.log.info('Starting engine...') cherrypy.engine.start() self.log.info('Engine started.') # wait for the shutdown event self.shutdown_event.wait() self.shutdown_event.clear() # tell metrics collection thread to stop collecting new metrics self.metrics_thread.stop() cherrypy.engine.stop() self.log.info('Engine stopped.') self.shutdown_rbd_stats() # wait for the metrics collection thread to stop self.metrics_thread.join() def shutdown(self): self.log.info('Stopping engine...') self.shutdown_event.set() class StandbyModule(MgrStandbyModule): def __init__(self, *args, **kwargs): super(StandbyModule, self).__init__(*args, **kwargs) self.shutdown_event = threading.Event() def serve(self): server_addr = self.get_localized_module_option( 'server_addr', get_default_addr()) server_port = self.get_localized_module_option( 'server_port', DEFAULT_PORT) self.log.info("server_addr: %s server_port: %s" % (server_addr, server_port)) cherrypy.config.update({ 'server.socket_host': server_addr, 'server.socket_port': int(server_port), 'engine.autoreload.on': False }) module = self class Root(object): @cherrypy.expose def index(self): active_uri = module.get_active_uri() return ''' Ceph Exporter

Ceph Exporter

Metrics

'''.format(active_uri) @cherrypy.expose def metrics(self): cherrypy.response.headers['Content-Type'] = 'text/plain' return '' cherrypy.tree.mount(Root(), '/', {}) self.log.info('Starting engine...') cherrypy.engine.start() self.log.info('Engine started.') # Wait for shutdown event self.shutdown_event.wait() self.shutdown_event.clear() cherrypy.engine.stop() self.log.info('Engine stopped.') def shutdown(self): self.log.info("Stopping engine...") self.shutdown_event.set() self.log.info("Stopped engine")