summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/prometheus/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/prometheus/module.py')
-rw-r--r--src/pybind/mgr/prometheus/module.py1324
1 files changed, 1324 insertions, 0 deletions
diff --git a/src/pybind/mgr/prometheus/module.py b/src/pybind/mgr/prometheus/module.py
new file mode 100644
index 00000000..16fd6f7f
--- /dev/null
+++ b/src/pybind/mgr/prometheus/module.py
@@ -0,0 +1,1324 @@
+import cherrypy
+from distutils.version import StrictVersion
+import json
+import errno
+import math
+import os
+import re
+import socket
+import threading
+import time
+from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
+from mgr_util import get_default_addr
+from rbd import RBD
+from collections import namedtuple
+try:
+ from typing import Optional
+except:
+ pass
+
+# Defaults for the Prometheus HTTP server. Can also set in config-key
+# see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
+# for Prometheus exporter port registry
+
+DEFAULT_PORT = 9283
+
+# When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
+# that the ports its listening on are in fact bound. When using the any address
+# "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
+# ipv6 isn't yet configured / supported and CherryPy throws an uncaught
+# exception.
+if cherrypy is not None:
+ v = StrictVersion(cherrypy.__version__)
+ # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
+ # centos:7) and back to at least 3.0.0.
+ if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
+ # https://github.com/cherrypy/cherrypy/issues/1100
+ from cherrypy.process import servers
+ servers.wait_for_occupied_port = lambda host, port: None
+
+# cherrypy likes to sys.exit on error. don't let it take us down too!
+def os_exit_noop(*args, **kwargs):
+ pass
+
+
+os._exit = os_exit_noop
+
+# to access things in class Module from subclass Root. Because
+# it's a dict, the writer doesn't need to declare 'global' for access
+
+_global_instance = None # type: Optional[Module]
+cherrypy.config.update({
+ 'response.headers.server': 'Ceph-Prometheus'
+})
+
+
+def health_status_to_number(status):
+ if status == 'HEALTH_OK':
+ return 0
+ elif status == 'HEALTH_WARN':
+ return 1
+ elif status == 'HEALTH_ERR':
+ return 2
+
+
+DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
+
+DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
+ 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
+ 'compress_bytes_used', 'compress_under_bytes']
+
+OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
+ 'recovering_keys_per_sec', 'num_objects_recovered',
+ 'num_bytes_recovered', 'num_bytes_recovered')
+
+OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
+ 'norecover', 'noscrub', 'nodeep-scrub')
+
+FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
+
+MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
+ 'ceph_version')
+
+MON_METADATA = ('ceph_daemon', 'hostname',
+ 'public_addr', 'rank', 'ceph_version')
+
+MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
+
+MGR_STATUS = ('ceph_daemon',)
+
+MGR_MODULE_STATUS = ('name',)
+
+MGR_MODULE_CAN_RUN = ('name',)
+
+OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
+ 'front_iface', 'hostname', 'objectstore', 'public_addr',
+ 'ceph_version')
+
+OSD_STATUS = ['weight', 'up', 'in']
+
+OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
+
+POOL_METADATA = ('pool_id', 'name')
+
+RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
+
+RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
+ 'ceph_version')
+
+DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
+ 'wal_device', 'instance')
+
+NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
+
+alert_metric = namedtuple('alert_metric', 'name description')
+HEALTH_CHECKS = [
+ alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ),
+]
+
+
+class Metric(object):
+ def __init__(self, mtype, name, desc, labels=None):
+ self.mtype = mtype
+ self.name = name
+ self.desc = desc
+ self.labelnames = labels # tuple if present
+ self.value = {} # indexed by label values
+
+ def clear(self):
+ self.value = {}
+
+ def set(self, value, labelvalues=None):
+ # labelvalues must be a tuple
+ labelvalues = labelvalues or ('',)
+ self.value[labelvalues] = value
+
+ def str_expfmt(self):
+
+ def promethize(path):
+ ''' replace illegal metric name characters '''
+ result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
+
+ # Hyphens usually turn into underscores, unless they are
+ # trailing
+ if result.endswith("-"):
+ result = result[0:-1] + "_minus"
+ else:
+ result = result.replace("-", "_")
+
+ return "ceph_{0}".format(result)
+
+ def floatstr(value):
+ ''' represent as Go-compatible float '''
+ if value == float('inf'):
+ return '+Inf'
+ if value == float('-inf'):
+ return '-Inf'
+ if math.isnan(value):
+ return 'NaN'
+ return repr(float(value))
+
+ name = promethize(self.name)
+ expfmt = '''
+# HELP {name} {desc}
+# TYPE {name} {mtype}'''.format(
+ name=name,
+ desc=self.desc,
+ mtype=self.mtype,
+ )
+
+ for labelvalues, value in self.value.items():
+ if self.labelnames:
+ labels = zip(self.labelnames, labelvalues)
+ labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+ else:
+ labels = ''
+ if labels:
+ fmtstr = '\n{name}{{{labels}}} {value}'
+ else:
+ fmtstr = '\n{name} {value}'
+ expfmt += fmtstr.format(
+ name=name,
+ labels=labels,
+ value=floatstr(value),
+ )
+ return expfmt
+
+
+class MetricCollectionThread(threading.Thread):
+ def __init__(self, module):
+ # type: (Module) -> None
+ self.mod = module
+ self.active = True
+ self.event = threading.Event()
+ super(MetricCollectionThread, self).__init__(target=self.collect)
+
+ def collect(self):
+ self.mod.log.info('starting metric collection thread')
+ while self.active:
+ self.mod.log.debug('collecting cache in thread')
+ if self.mod.have_mon_connection():
+ start_time = time.time()
+
+ try:
+ data = self.mod.collect()
+ except:
+ # Log any issues encountered during the data collection and continue
+ self.mod.log.exception("failed to collect metrics:")
+ self.event.wait(self.mod.scrape_interval)
+ continue
+
+ duration = time.time() - start_time
+
+ sleep_time = self.mod.scrape_interval - duration
+ if sleep_time < 0:
+ self.mod.log.warning(
+ 'Collecting data took more time than configured scrape interval. '
+ 'This possibly results in stale data. Please check the '
+ '`stale_cache_strategy` configuration option. '
+ 'Collecting data took {:.2f} seconds but scrape interval is configured '
+ 'to be {:.0f} seconds.'.format(
+ duration,
+ self.mod.scrape_interval,
+ )
+ )
+ sleep_time = 0
+
+ with self.mod.collect_lock:
+ self.mod.collect_cache = data
+ self.mod.collect_time = duration
+
+ self.event.wait(sleep_time)
+ else:
+ self.mod.log.error('No MON connection')
+ self.event.wait(self.mod.scrape_interval)
+
+ def stop(self):
+ self.active = False
+ self.event.set()
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "prometheus file_sd_config",
+ "desc": "Return file_sd compatible prometheus config for mgr cluster",
+ "perm": "r"
+ },
+ ]
+
+ MODULE_OPTIONS = [
+ {'name': 'server_addr'},
+ {'name': 'server_port'},
+ {'name': 'scrape_interval'},
+ {'name': 'stale_cache_strategy'},
+ {'name': 'rbd_stats_pools'},
+ {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
+ ]
+
+ STALE_CACHE_FAIL = 'fail'
+ STALE_CACHE_RETURN = 'return'
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.metrics = self._setup_static_metrics()
+ self.shutdown_event = threading.Event()
+ self.collect_lock = threading.Lock()
+ self.collect_time = 0
+ self.scrape_interval = 15.0
+ self.stale_cache_strategy = self.STALE_CACHE_FAIL
+ self.collect_cache = None
+ self.rbd_stats = {
+ 'pools': {},
+ 'pools_refresh_time': 0,
+ 'counters_info': {
+ 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
+ 'desc': 'RBD image writes count'},
+ 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
+ 'desc': 'RBD image reads count'},
+ 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
+ 'desc': 'RBD image bytes written'},
+ 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
+ 'desc': 'RBD image bytes read'},
+ 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
+ 'desc': 'RBD image writes latency (msec)'},
+ 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
+ 'desc': 'RBD image reads latency (msec)'},
+ },
+ }
+ global _global_instance
+ _global_instance = self
+ self.metrics_thread = MetricCollectionThread(_global_instance)
+
+ def _setup_static_metrics(self):
+ metrics = {}
+ metrics['health_status'] = Metric(
+ 'untyped',
+ 'health_status',
+ 'Cluster health status'
+ )
+ metrics['mon_quorum_status'] = Metric(
+ 'gauge',
+ 'mon_quorum_status',
+ 'Monitors in quorum',
+ ('ceph_daemon',)
+ )
+ metrics['fs_metadata'] = Metric(
+ 'untyped',
+ 'fs_metadata',
+ 'FS Metadata',
+ FS_METADATA
+ )
+ metrics['mds_metadata'] = Metric(
+ 'untyped',
+ 'mds_metadata',
+ 'MDS Metadata',
+ MDS_METADATA
+ )
+ metrics['mon_metadata'] = Metric(
+ 'untyped',
+ 'mon_metadata',
+ 'MON Metadata',
+ MON_METADATA
+ )
+ metrics['mgr_metadata'] = Metric(
+ 'gauge',
+ 'mgr_metadata',
+ 'MGR metadata',
+ MGR_METADATA
+ )
+ metrics['mgr_status'] = Metric(
+ 'gauge',
+ 'mgr_status',
+ 'MGR status (0=standby, 1=active)',
+ MGR_STATUS
+ )
+ metrics['mgr_module_status'] = Metric(
+ 'gauge',
+ 'mgr_module_status',
+ 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
+ MGR_MODULE_STATUS
+ )
+ metrics['mgr_module_can_run'] = Metric(
+ 'gauge',
+ 'mgr_module_can_run',
+ 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
+ MGR_MODULE_CAN_RUN
+ )
+ metrics['osd_metadata'] = Metric(
+ 'untyped',
+ 'osd_metadata',
+ 'OSD Metadata',
+ OSD_METADATA
+ )
+
+ # The reason for having this separate to OSD_METADATA is
+ # so that we can stably use the same tag names that
+ # the Prometheus node_exporter does
+ metrics['disk_occupation'] = Metric(
+ 'untyped',
+ 'disk_occupation',
+ 'Associate Ceph daemon with disk used',
+ DISK_OCCUPATION
+ )
+
+ metrics['pool_metadata'] = Metric(
+ 'untyped',
+ 'pool_metadata',
+ 'POOL Metadata',
+ POOL_METADATA
+ )
+
+ metrics['rgw_metadata'] = Metric(
+ 'untyped',
+ 'rgw_metadata',
+ 'RGW Metadata',
+ RGW_METADATA
+ )
+
+ metrics['rbd_mirror_metadata'] = Metric(
+ 'untyped',
+ 'rbd_mirror_metadata',
+ 'RBD Mirror Metadata',
+ RBD_MIRROR_METADATA
+ )
+
+ metrics['pg_total'] = Metric(
+ 'gauge',
+ 'pg_total',
+ 'PG Total Count per Pool',
+ ('pool_id',)
+ )
+
+ metrics['scrape_duration_seconds'] = Metric(
+ 'gauge',
+ 'scrape_duration_secs',
+ 'Time taken to gather metrics from Ceph (secs)'
+ )
+
+ for flag in OSD_FLAGS:
+ path = 'osd_flag_{}'.format(flag)
+ metrics[path] = Metric(
+ 'untyped',
+ path,
+ 'OSD Flag {}'.format(flag)
+ )
+ for state in OSD_STATUS:
+ path = 'osd_{}'.format(state)
+ metrics[path] = Metric(
+ 'untyped',
+ path,
+ 'OSD status {}'.format(state),
+ ('ceph_daemon',)
+ )
+ for stat in OSD_STATS:
+ path = 'osd_{}'.format(stat)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ 'OSD stat {}'.format(stat),
+ ('ceph_daemon',)
+ )
+ for stat in OSD_POOL_STATS:
+ path = 'pool_{}'.format(stat)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ "OSD POOL STATS: {}".format(stat),
+ ('pool_id',)
+ )
+ for state in PG_STATES:
+ path = 'pg_{}'.format(state)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ 'PG {} per pool'.format(state),
+ ('pool_id',)
+ )
+ for state in DF_CLUSTER:
+ path = 'cluster_{}'.format(state)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ 'DF {}'.format(state),
+ )
+ for state in DF_POOL:
+ path = 'pool_{}'.format(state)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ 'DF pool {}'.format(state),
+ ('pool_id',)
+ )
+ for state in NUM_OBJECTS:
+ path = 'num_objects_{}'.format(state)
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ 'Number of {} objects'.format(state),
+ )
+
+ for check in HEALTH_CHECKS:
+ path = 'healthcheck_{}'.format(check.name.lower())
+ metrics[path] = Metric(
+ 'gauge',
+ path,
+ check.description,
+ )
+
+ return metrics
+
+ def get_health(self):
+
+ def _get_value(message, delim=' ', word_pos=0):
+ """Extract value from message (default is 1st field)"""
+ v_str = message.split(delim)[word_pos]
+ if v_str.isdigit():
+ return int(v_str), 0
+ return 0, 1
+
+ health = json.loads(self.get('health')['json'])
+ # set overall health
+ self.metrics['health_status'].set(
+ health_status_to_number(health['status'])
+ )
+
+ # Examine the health to see if any health checks triggered need to
+ # become a metric.
+ active_healthchecks = health.get('checks', {})
+ active_names = active_healthchecks.keys()
+
+ for check in HEALTH_CHECKS:
+ path = 'healthcheck_{}'.format(check.name.lower())
+
+ if path in self.metrics:
+
+ if check.name in active_names:
+ check_data = active_healthchecks[check.name]
+ message = check_data['summary'].get('message', '')
+ v, err = 0, 0
+
+ if check.name == "SLOW_OPS":
+ # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops.
+ v, err = _get_value(message)
+
+ if err:
+ self.log.error("healthcheck {} message format is incompatible and has been dropped".format(check.name))
+ # drop the metric, so it's no longer emitted
+ del self.metrics[path]
+ continue
+ else:
+ self.metrics[path].set(v)
+ else:
+ # health check is not active, so give it a default of 0
+ self.metrics[path].set(0)
+
+ def get_pool_stats(self):
+ # retrieve pool stats to provide per pool recovery metrics
+ # (osd_pool_stats moved to mgr in Mimic)
+ pstats = self.get('osd_pool_stats')
+ for pool in pstats['pool_stats']:
+ for stat in OSD_POOL_STATS:
+ self.metrics['pool_{}'.format(stat)].set(
+ pool['recovery_rate'].get(stat, 0),
+ (pool['pool_id'],)
+ )
+
+ def get_df(self):
+ # maybe get the to-be-exported metrics from a config?
+ df = self.get('df')
+ for stat in DF_CLUSTER:
+ self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
+
+ for pool in df['pools']:
+ for stat in DF_POOL:
+ self.metrics['pool_{}'.format(stat)].set(
+ pool['stats'][stat],
+ (pool['id'],)
+ )
+
+ def get_fs(self):
+ fs_map = self.get('fs_map')
+ servers = self.get_service_list()
+ active_daemons = []
+ for fs in fs_map['filesystems']:
+ # collect fs metadata
+ data_pools = ",".join([str(pool)
+ for pool in fs['mdsmap']['data_pools']])
+ self.metrics['fs_metadata'].set(1, (
+ data_pools,
+ fs['id'],
+ fs['mdsmap']['metadata_pool'],
+ fs['mdsmap']['fs_name']
+ ))
+ self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
+ for gid, daemon in fs['mdsmap']['info'].items():
+ id_ = daemon['name']
+ host_version = servers.get((id_, 'mds'), ('', ''))
+ self.metrics['mds_metadata'].set(1, (
+ 'mds.{}'.format(id_), fs['id'],
+ host_version[0], daemon['addr'],
+ daemon['rank'], host_version[1]
+ ))
+
+ def get_quorum_status(self):
+ mon_status = json.loads(self.get('mon_status')['json'])
+ servers = self.get_service_list()
+ for mon in mon_status['monmap']['mons']:
+ rank = mon['rank']
+ id_ = mon['name']
+ host_version = servers.get((id_, 'mon'), ('', ''))
+ self.metrics['mon_metadata'].set(1, (
+ 'mon.{}'.format(id_), host_version[0],
+ mon['public_addr'].rsplit(':', 1)[0], rank,
+ host_version[1]
+ ))
+ in_quorum = int(rank in mon_status['quorum'])
+ self.metrics['mon_quorum_status'].set(in_quorum, (
+ 'mon.{}'.format(id_),
+ ))
+
+ def get_mgr_status(self):
+ mgr_map = self.get('mgr_map')
+ servers = self.get_service_list()
+
+ active = mgr_map['active_name']
+ standbys = [s.get('name') for s in mgr_map['standbys']]
+
+ all_mgrs = list(standbys)
+ all_mgrs.append(active)
+
+ all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
+
+ for mgr in all_mgrs:
+ host_version = servers.get((mgr, 'mgr'), ('', ''))
+ if mgr == active:
+ _state = 1
+ else:
+ _state = 0
+
+ self.metrics['mgr_metadata'].set(1, (
+ 'mgr.{}'.format(mgr), host_version[0],
+ host_version[1]
+ ))
+ self.metrics['mgr_status'].set(_state, (
+ 'mgr.{}'.format(mgr),
+ ))
+ always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
+ active_modules = list(always_on_modules)
+ active_modules.extend(mgr_map['modules'])
+
+ for mod_name in all_modules.keys():
+
+ if mod_name in always_on_modules:
+ _state = 2
+ elif mod_name in active_modules:
+ _state = 1
+ else:
+ _state = 0
+
+ _can_run = 1 if all_modules[mod_name] else 0
+ self.metrics['mgr_module_status'].set(_state, (mod_name,))
+ self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
+
+ def get_pg_status(self):
+
+ pg_summary = self.get('pg_summary')
+
+ for pool in pg_summary['by_pool']:
+ num_by_state = dict((state, 0) for state in PG_STATES)
+ num_by_state['total'] = 0
+
+ for state_name, count in pg_summary['by_pool'][pool].items():
+ for state in state_name.split('+'):
+ num_by_state[state] += count
+ num_by_state['total'] += count
+
+ for state, num in num_by_state.items():
+ try:
+ self.metrics["pg_{}".format(state)].set(num, (pool,))
+ except KeyError:
+ self.log.warn("skipping pg in unknown state {}".format(state))
+
+ def get_osd_stats(self):
+ osd_stats = self.get('osd_stats')
+ for osd in osd_stats['osd_stats']:
+ id_ = osd['osd']
+ for stat in OSD_STATS:
+ val = osd['perf_stat'][stat]
+ self.metrics['osd_{}'.format(stat)].set(val, (
+ 'osd.{}'.format(id_),
+ ))
+
+ def get_service_list(self):
+ ret = {}
+ for server in self.list_servers():
+ version = server.get('ceph_version', '')
+ host = server.get('hostname', '')
+ for service in server.get('services', []):
+ ret.update({(service['id'], service['type']): (host, version)})
+ return ret
+
+ def get_metadata_and_osd_status(self):
+ osd_map = self.get('osd_map')
+ osd_flags = osd_map['flags'].split(',')
+ for flag in OSD_FLAGS:
+ self.metrics['osd_flag_{}'.format(flag)].set(
+ int(flag in osd_flags)
+ )
+
+ osd_devices = self.get('osd_map_crush')['devices']
+ servers = self.get_service_list()
+ for osd in osd_map['osds']:
+ # id can be used to link osd metrics and metadata
+ id_ = osd['osd']
+ # collect osd metadata
+ p_addr = osd['public_addr'].rsplit(':', 1)[0]
+ c_addr = osd['cluster_addr'].rsplit(':', 1)[0]
+ if p_addr == "-" or c_addr == "-":
+ self.log.info(
+ "Missing address metadata for osd {0}, skipping occupation"
+ " and metadata records for this osd".format(id_)
+ )
+ continue
+
+ dev_class = None
+ for osd_device in osd_devices:
+ if osd_device['id'] == id_:
+ dev_class = osd_device.get('class', '')
+ break
+
+ if dev_class is None:
+ self.log.info(
+ "OSD {0} is missing from CRUSH map, skipping output".format(
+ id_))
+ continue
+
+ host_version = servers.get((str(id_), 'osd'), ('', ''))
+
+ # collect disk occupation metadata
+ osd_metadata = self.get_metadata("osd", str(id_))
+ if osd_metadata is None:
+ continue
+
+ obj_store = osd_metadata.get('osd_objectstore', '')
+ f_iface = osd_metadata.get('front_iface', '')
+ b_iface = osd_metadata.get('back_iface', '')
+
+ self.metrics['osd_metadata'].set(1, (
+ b_iface,
+ 'osd.{}'.format(id_),
+ c_addr,
+ dev_class,
+ f_iface,
+ host_version[0],
+ obj_store,
+ p_addr,
+ host_version[1]
+ ))
+
+ # collect osd status
+ for state in OSD_STATUS:
+ status = osd[state]
+ self.metrics['osd_{}'.format(state)].set(status, (
+ 'osd.{}'.format(id_),
+ ))
+
+ osd_dev_node = None
+ if obj_store == "filestore":
+ # collect filestore backend device
+ osd_dev_node = osd_metadata.get(
+ 'backend_filestore_dev_node', None)
+ # collect filestore journal device
+ osd_wal_dev_node = osd_metadata.get('osd_journal', '')
+ osd_db_dev_node = ''
+ elif obj_store == "bluestore":
+ # collect bluestore backend device
+ osd_dev_node = osd_metadata.get(
+ 'bluestore_bdev_dev_node', None)
+ # collect bluestore wal backend
+ osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
+ # collect bluestore db backend
+ osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
+ if osd_dev_node and osd_dev_node == "unknown":
+ osd_dev_node = None
+
+ osd_hostname = osd_metadata.get('hostname', None)
+ if osd_dev_node and osd_hostname:
+ self.log.debug("Got dev for osd {0}: {1}/{2}".format(
+ id_, osd_hostname, osd_dev_node))
+ self.metrics['disk_occupation'].set(1, (
+ "osd.{0}".format(id_),
+ osd_dev_node,
+ osd_db_dev_node,
+ osd_wal_dev_node,
+ osd_hostname
+ ))
+ else:
+ self.log.info("Missing dev node metadata for osd {0}, skipping "
+ "occupation record for this osd".format(id_))
+
+ pool_meta = []
+ for pool in osd_map['pools']:
+ self.metrics['pool_metadata'].set(
+ 1, (pool['pool'], pool['pool_name']))
+
+ # Populate other servers metadata
+ for key, value in servers.items():
+ service_id, service_type = key
+ if service_type == 'rgw':
+ hostname, version = value
+ self.metrics['rgw_metadata'].set(
+ 1,
+ ('{}.{}'.format(service_type, service_id), hostname, version)
+ )
+ elif service_type == 'rbd-mirror':
+ mirror_metadata = self.get_metadata('rbd-mirror', service_id)
+ if mirror_metadata is None:
+ continue
+ mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
+ service_id)
+ self.metrics['rbd_mirror_metadata'].set(
+ 1, (mirror_metadata.get(k, '')
+ for k in RBD_MIRROR_METADATA)
+ )
+
+ def get_num_objects(self):
+ pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
+ for obj in NUM_OBJECTS:
+ stat = 'num_objects_{}'.format(obj)
+ self.metrics[stat].set(pg_sum[stat])
+
+ def get_rbd_stats(self):
+ # Per RBD image stats is collected by registering a dynamic osd perf
+ # stats query that tells OSDs to group stats for requests associated
+ # with RBD objects by pool, namespace, and image id, which are
+ # extracted from the request object names or other attributes.
+ # The RBD object names have the following prefixes:
+ # - rbd_data.{image_id}. (data stored in the same pool as metadata)
+ # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
+ # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
+ # The pool_id in the object name is the id of the pool with the image
+ # metdata, and should be used in the image spec. If there is no pool_id
+ # in the object name, the image pool is the pool where the object is
+ # located.
+
+ # Parse rbd_stats_pools option, which is a comma or space separated
+ # list of pool[/namespace] entries. If no namespace is specifed the
+ # stats are collected for every namespace in the pool. The wildcard
+ # '*' can be used to indicate all pools or namespaces
+ pools_string = self.get_localized_module_option('rbd_stats_pools', '')
+ pool_keys = []
+ for x in re.split('[\s,]+', pools_string):
+ if not x:
+ continue
+
+ s = x.split('/', 2)
+ pool_name = s[0]
+ namespace_name = None
+ if len(s) == 2:
+ namespace_name = s[1]
+
+ if pool_name == "*":
+ # collect for all pools
+ osd_map = self.get('osd_map')
+ for pool in osd_map['pools']:
+ if 'rbd' not in pool.get('application_metadata', {}):
+ continue
+ pool_keys.append((pool['pool_name'], namespace_name))
+ else:
+ pool_keys.append((pool_name, namespace_name))
+
+ pools = {}
+ for pool_key in pool_keys:
+ pool_name = pool_key[0]
+ namespace_name = pool_key[1]
+ if not namespace_name or namespace_name == "*":
+ # empty set means collect for all namespaces
+ pools[pool_name] = set()
+ continue
+
+ if pool_name not in pools:
+ pools[pool_name] = set()
+ elif not pools[pool_name]:
+ continue
+ pools[pool_name].add(namespace_name)
+
+ rbd_stats_pools = {}
+ for pool_id in list(self.rbd_stats['pools']):
+ name = self.rbd_stats['pools'][pool_id]['name']
+ if name not in pools:
+ del self.rbd_stats['pools'][pool_id]
+ else:
+ rbd_stats_pools[name] = \
+ self.rbd_stats['pools'][pool_id]['ns_names']
+
+ pools_refreshed = False
+ if pools:
+ next_refresh = self.rbd_stats['pools_refresh_time'] + \
+ self.get_localized_module_option(
+ 'rbd_stats_pools_refresh_interval', 300)
+ if rbd_stats_pools != pools or time.time() >= next_refresh:
+ self.refresh_rbd_stats_pools(pools)
+ pools_refreshed = True
+
+ pool_ids = list(self.rbd_stats['pools'])
+ pool_ids.sort()
+ pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
+
+ nspace_names = []
+ for pool_id, pool in self.rbd_stats['pools'].items():
+ if pool['ns_names']:
+ nspace_names.extend(pool['ns_names'])
+ else:
+ nspace_names = []
+ break
+ if nspace_names:
+ namespace_regex = '^(' + \
+ "|".join([re.escape(x)
+ for x in set(nspace_names)]) + ')$'
+ else:
+ namespace_regex = '^(.*)$'
+
+ if 'query' in self.rbd_stats and \
+ (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or
+ namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']):
+ self.remove_osd_perf_query(self.rbd_stats['query_id'])
+ del self.rbd_stats['query_id']
+ del self.rbd_stats['query']
+
+ if not self.rbd_stats['pools']:
+ return
+
+ counters_info = self.rbd_stats['counters_info']
+
+ if 'query_id' not in self.rbd_stats:
+ query = {
+ 'key_descriptor': [
+ {'type': 'pool_id', 'regex': pool_id_regex},
+ {'type': 'namespace', 'regex': namespace_regex},
+ {'type': 'object_name',
+ 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
+ ],
+ 'performance_counter_descriptors': list(counters_info),
+ }
+ query_id = self.add_osd_perf_query(query)
+ if query_id is None:
+ self.log.error('failed to add query %s' % query)
+ return
+ self.rbd_stats['query'] = query
+ self.rbd_stats['query_id'] = query_id
+
+ res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
+ for c in res['counters']:
+ # if the pool id is not found in the object name use id of the
+ # pool where the object is located
+ if c['k'][2][0]:
+ pool_id = int(c['k'][2][0])
+ else:
+ pool_id = int(c['k'][0][0])
+ if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
+ self.refresh_rbd_stats_pools(pools)
+ pools_refreshed = True
+ if pool_id not in self.rbd_stats['pools']:
+ continue
+ pool = self.rbd_stats['pools'][pool_id]
+ nspace_name = c['k'][1][0]
+ if nspace_name not in pool['images']:
+ continue
+ image_id = c['k'][2][1]
+ if image_id not in pool['images'][nspace_name] and \
+ not pools_refreshed:
+ self.refresh_rbd_stats_pools(pools)
+ pool = self.rbd_stats['pools'][pool_id]
+ pools_refreshed = True
+ if image_id not in pool['images'][nspace_name]:
+ continue
+ counters = pool['images'][nspace_name][image_id]['c']
+ for i in range(len(c['c'])):
+ counters[i][0] += c['c'][i][0]
+ counters[i][1] += c['c'][i][1]
+
+ label_names = ("pool", "namespace", "image")
+ for pool_id, pool in self.rbd_stats['pools'].items():
+ pool_name = pool['name']
+ for nspace_name, images in pool['images'].items():
+ for image_id in images:
+ image_name = images[image_id]['n']
+ counters = images[image_id]['c']
+ i = 0
+ for key in counters_info:
+ counter_info = counters_info[key]
+ stattype = self._stattype_to_str(counter_info['type'])
+ labels = (pool_name, nspace_name, image_name)
+ if counter_info['type'] == self.PERFCOUNTER_COUNTER:
+ path = 'rbd_' + key
+ if path not in self.metrics:
+ self.metrics[path] = Metric(
+ stattype,
+ path,
+ counter_info['desc'],
+ label_names,
+ )
+ self.metrics[path].set(counters[i][0], labels)
+ elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
+ path = 'rbd_' + key + '_sum'
+ if path not in self.metrics:
+ self.metrics[path] = Metric(
+ stattype,
+ path,
+ counter_info['desc'] + ' Total',
+ label_names,
+ )
+ self.metrics[path].set(counters[i][0], labels)
+ path = 'rbd_' + key + '_count'
+ if path not in self.metrics:
+ self.metrics[path] = Metric(
+ 'counter',
+ path,
+ counter_info['desc'] + ' Count',
+ label_names,
+ )
+ self.metrics[path].set(counters[i][1], labels)
+ i += 1
+
+ def refresh_rbd_stats_pools(self, pools):
+ self.log.debug('refreshing rbd pools %s' % (pools))
+
+ rbd = RBD()
+ counters_info = self.rbd_stats['counters_info']
+ for pool_name, cfg_ns_names in pools.items():
+ try:
+ pool_id = self.rados.pool_lookup(pool_name)
+ with self.rados.open_ioctx(pool_name) as ioctx:
+ if pool_id not in self.rbd_stats['pools']:
+ self.rbd_stats['pools'][pool_id] = {'images': {}}
+ pool = self.rbd_stats['pools'][pool_id]
+ pool['name'] = pool_name
+ pool['ns_names'] = cfg_ns_names
+ if cfg_ns_names:
+ nspace_names = list(cfg_ns_names)
+ else:
+ nspace_names = [''] + rbd.namespace_list(ioctx)
+ for nspace_name in pool['images']:
+ if nspace_name not in nspace_names:
+ del pool['images'][nspace_name]
+ for nspace_name in nspace_names:
+ if (nspace_name and
+ not rbd.namespace_exists(ioctx, nspace_name)):
+ self.log.debug('unknown namespace %s for pool %s' %
+ (nspace_name, pool_name))
+ continue
+ ioctx.set_namespace(nspace_name)
+ if nspace_name not in pool['images']:
+ pool['images'][nspace_name] = {}
+ namespace = pool['images'][nspace_name]
+ images = {}
+ for image_meta in RBD().list2(ioctx):
+ image = {'n': image_meta['name']}
+ image_id = image_meta['id']
+ if image_id in namespace:
+ image['c'] = namespace[image_id]['c']
+ else:
+ image['c'] = [[0, 0] for x in counters_info]
+ images[image_id] = image
+ pool['images'][nspace_name] = images
+ except Exception as e:
+ self.log.error('failed listing pool %s: %s' % (pool_name, e))
+ self.rbd_stats['pools_refresh_time'] = time.time()
+
+ def shutdown_rbd_stats(self):
+ if 'query_id' in self.rbd_stats:
+ self.remove_osd_perf_query(self.rbd_stats['query_id'])
+ del self.rbd_stats['query_id']
+ del self.rbd_stats['query']
+ self.rbd_stats['pools'].clear()
+
+ def collect(self):
+ # Clear the metrics before scraping
+ for k in self.metrics.keys():
+ self.metrics[k].clear()
+
+ _start_time = time.time()
+
+ self.get_health()
+ self.get_df()
+ self.get_pool_stats()
+ self.get_fs()
+ self.get_osd_stats()
+ self.get_quorum_status()
+ self.get_mgr_status()
+ self.get_metadata_and_osd_status()
+ self.get_pg_status()
+ self.get_num_objects()
+
+ for daemon, counters in self.get_all_perf_counters().items():
+ for path, counter_info in counters.items():
+ # Skip histograms, they are represented by long running avgs
+ stattype = self._stattype_to_str(counter_info['type'])
+ if not stattype or stattype == 'histogram':
+ self.log.debug('ignoring %s, type %s' % (path, stattype))
+ continue
+
+ path, label_names, labels = self._perfpath_to_path_labels(
+ daemon, path)
+
+ # Get the value of the counter
+ value = self._perfvalue_to_value(
+ counter_info['type'], counter_info['value'])
+
+ # Represent the long running avgs as sum/count pairs
+ if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
+ _path = path + '_sum'
+ if _path not in self.metrics:
+ self.metrics[_path] = Metric(
+ stattype,
+ _path,
+ counter_info['description'] + ' Total',
+ label_names,
+ )
+ self.metrics[_path].set(value, labels)
+
+ _path = path + '_count'
+ if _path not in self.metrics:
+ self.metrics[_path] = Metric(
+ 'counter',
+ _path,
+ counter_info['description'] + ' Count',
+ label_names,
+ )
+ self.metrics[_path].set(counter_info['count'], labels,)
+ else:
+ if path not in self.metrics:
+ self.metrics[path] = Metric(
+ stattype,
+ path,
+ counter_info['description'],
+ label_names,
+ )
+ self.metrics[path].set(value, labels)
+
+ self.get_rbd_stats()
+
+ _end_time = time.time()
+ self.metrics['scrape_duration_seconds'].set(_end_time - _start_time)
+
+ # Return formatted metrics and clear no longer used data
+ _metrics = [m.str_expfmt() for m in self.metrics.values()]
+ for k in self.metrics.keys():
+ self.metrics[k].clear()
+
+ return ''.join(_metrics) + '\n'
+
+ def get_file_sd_config(self):
+ servers = self.list_servers()
+ targets = []
+ for server in servers:
+ hostname = server.get('hostname', '')
+ for service in server.get('services', []):
+ if service['type'] != 'mgr':
+ continue
+ id_ = service['id']
+ # get port for prometheus module at mgr with id_
+ # TODO use get_config_prefix or get_config here once
+ # https://github.com/ceph/ceph/pull/20458 is merged
+ result = CommandResult("")
+ _global_instance.send_command(
+ result, "mon", '',
+ json.dumps({
+ "prefix": "config-key get",
+ 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
+ }),
+ "")
+ r, outb, outs = result.wait()
+ if r != 0:
+ _global_instance.log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
+ targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
+ else:
+ port = json.loads(outb)
+ targets.append('{}:{}'.format(hostname, port))
+
+ ret = [
+ {
+ "targets": targets,
+ "labels": {}
+ }
+ ]
+ return 0, json.dumps(ret), ""
+
+ def self_test(self):
+ self.collect()
+ self.get_file_sd_config()
+
+ def handle_command(self, inbuf, cmd):
+ if cmd['prefix'] == 'prometheus file_sd_config':
+ return self.get_file_sd_config()
+ else:
+ return (-errno.EINVAL, '',
+ "Command not found '{0}'".format(cmd['prefix']))
+
+ def serve(self):
+
+ class Root(object):
+
+ # collapse everything to '/'
+ def _cp_dispatch(self, vpath):
+ cherrypy.request.path = ''
+ return self
+
+ @cherrypy.expose
+ def index(self):
+ return '''<!DOCTYPE html>
+<html>
+ <head><title>Ceph Exporter</title></head>
+ <body>
+ <h1>Ceph Exporter</h1>
+ <p><a href='/metrics'>Metrics</a></p>
+ </body>
+</html>'''
+
+ @cherrypy.expose
+ def metrics(self):
+ # Lock the function execution
+ with _global_instance.collect_lock:
+ return self._metrics(_global_instance)
+
+ @staticmethod
+ def _metrics(instance):
+ # Return cached data if available
+ if not instance.collect_cache:
+ raise cherrypy.HTTPError(503, 'No cached data available yet')
+
+ def respond():
+ cherrypy.response.headers['Content-Type'] = 'text/plain'
+ return instance.collect_cache
+
+ if instance.collect_time < instance.scrape_interval:
+ # Respond if cache isn't stale
+ return respond()
+
+ if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
+ # Respond even if cache is stale
+ instance.log.info(
+ 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+ 'returning metrics from stale cache.'.format(
+ instance.collect_time,
+ instance.collect_time - instance.scrape_interval
+ )
+ )
+ return respond()
+
+ if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
+ # Fail if cache is stale
+ msg = (
+ 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
+ 'returning "service unavailable".'.format(
+ instance.collect_time,
+ instance.collect_time - instance.scrape_interval,
+ )
+ )
+ instance.log.error(msg)
+ raise cherrypy.HTTPError(503, msg)
+
+ # Make the cache timeout for collecting configurable
+ self.scrape_interval = float(self.get_localized_module_option('scrape_interval', 15.0))
+
+ self.stale_cache_strategy = self.get_localized_module_option('stale_cache_strategy', 'log')
+ if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
+ self.STALE_CACHE_RETURN]:
+ self.stale_cache_strategy = self.STALE_CACHE_FAIL
+
+ server_addr = self.get_localized_module_option(
+ 'server_addr', get_default_addr())
+ server_port = self.get_localized_module_option(
+ 'server_port', DEFAULT_PORT)
+ self.log.info(
+ "server_addr: %s server_port: %s" %
+ (server_addr, server_port)
+ )
+
+ self.metrics_thread.start()
+
+ # Publish the URI that others may use to access the service we're
+ # about to start serving
+ self.set_uri('http://{0}:{1}/'.format(
+ socket.getfqdn() if server_addr in ['::', '0.0.0.0'] else server_addr,
+ server_port
+ ))
+
+ cherrypy.config.update({
+ 'server.socket_host': server_addr,
+ 'server.socket_port': int(server_port),
+ 'engine.autoreload.on': False
+ })
+ cherrypy.tree.mount(Root(), "/")
+ self.log.info('Starting engine...')
+ cherrypy.engine.start()
+ self.log.info('Engine started.')
+ # wait for the shutdown event
+ self.shutdown_event.wait()
+ self.shutdown_event.clear()
+ # tell metrics collection thread to stop collecting new metrics
+ self.metrics_thread.stop()
+ cherrypy.engine.stop()
+ self.log.info('Engine stopped.')
+ self.shutdown_rbd_stats()
+ # wait for the metrics collection thread to stop
+ self.metrics_thread.join()
+
+ def shutdown(self):
+ self.log.info('Stopping engine...')
+ self.shutdown_event.set()
+
+
+class StandbyModule(MgrStandbyModule):
+ def __init__(self, *args, **kwargs):
+ super(StandbyModule, self).__init__(*args, **kwargs)
+ self.shutdown_event = threading.Event()
+
+ def serve(self):
+ server_addr = self.get_localized_module_option(
+ 'server_addr', get_default_addr())
+ server_port = self.get_localized_module_option(
+ 'server_port', DEFAULT_PORT)
+ self.log.info("server_addr: %s server_port: %s" %
+ (server_addr, server_port))
+ cherrypy.config.update({
+ 'server.socket_host': server_addr,
+ 'server.socket_port': int(server_port),
+ 'engine.autoreload.on': False
+ })
+
+ module = self
+
+ class Root(object):
+ @cherrypy.expose
+ def index(self):
+ active_uri = module.get_active_uri()
+ return '''<!DOCTYPE html>
+<html>
+ <head><title>Ceph Exporter</title></head>
+ <body>
+ <h1>Ceph Exporter</h1>
+ <p><a href='{}metrics'>Metrics</a></p>
+ </body>
+</html>'''.format(active_uri)
+
+ @cherrypy.expose
+ def metrics(self):
+ cherrypy.response.headers['Content-Type'] = 'text/plain'
+ return ''
+
+ cherrypy.tree.mount(Root(), '/', {})
+ self.log.info('Starting engine...')
+ cherrypy.engine.start()
+ self.log.info('Engine started.')
+ # Wait for shutdown event
+ self.shutdown_event.wait()
+ self.shutdown_event.clear()
+ cherrypy.engine.stop()
+ self.log.info('Engine stopped.')
+
+ def shutdown(self):
+ self.log.info("Stopping engine...")
+ self.shutdown_event.set()
+ self.log.info("Stopped engine")