From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/zabbix/module.py | 476 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 476 insertions(+) create mode 100644 src/pybind/mgr/zabbix/module.py (limited to 'src/pybind/mgr/zabbix/module.py') diff --git a/src/pybind/mgr/zabbix/module.py b/src/pybind/mgr/zabbix/module.py new file mode 100644 index 000000000..638b68856 --- /dev/null +++ b/src/pybind/mgr/zabbix/module.py @@ -0,0 +1,476 @@ +""" +Zabbix module for ceph-mgr + +Collect statistics from Ceph cluster and every X seconds send data to a Zabbix +server using the zabbix_sender executable. +""" +import logging +import json +import errno +import re +from subprocess import Popen, PIPE +from threading import Event +from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue +from typing import cast, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union + + +def avg(data: Sequence[Union[int, float]]) -> float: + if len(data): + return sum(data) / float(len(data)) + else: + return 0 + + +class ZabbixSender(object): + def __init__(self, sender: str, host: str, port: int, log: logging.Logger) -> None: + self.sender = sender + self.host = host + self.port = port + self.log = log + + def send(self, hostname: str, data: Mapping[str, Union[int, float, str]]) -> None: + if len(data) == 0: + return + + cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', + hostname, '-vv', '-i', '-'] + + self.log.debug('Executing: %s', cmd) + + proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, encoding='utf-8') + + for key, value in data.items(): + assert proc.stdin + proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value)) + + stdout, stderr = proc.communicate() + if proc.returncode != 0: + raise RuntimeError('%s exited non-zero: %s' % (self.sender, + stderr)) + + self.log.debug('Zabbix Sender: %s', stdout.rstrip()) + + +class Module(MgrModule): + run = False + config: Dict[str, OptionValue] = {} + ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} + _zabbix_hosts: List[Dict[str, Union[str, int]]] = list() + + @property + def config_keys(self) -> Dict[str, OptionValue]: + return dict((o['name'], o.get('default', None)) + for o in self.MODULE_OPTIONS) + + MODULE_OPTIONS = [ + Option( + name='zabbix_sender', + default='/usr/bin/zabbix_sender'), + Option( + name='zabbix_host', + type='str', + default=None), + Option( + name='zabbix_port', + type='int', + default=10051), + Option( + name='identifier', + default=""), + Option( + name='interval', + type='secs', + default=60), + Option( + name='discovery_interval', + type='uint', + default=100) + ] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + + def init_module_config(self) -> None: + self.fsid = self.get('mon_map')['fsid'] + self.log.debug('Found Ceph fsid %s', self.fsid) + + for key, default in self.config_keys.items(): + self.set_config_option(key, self.get_module_option(key, default)) + + if self.config['zabbix_host']: + self._parse_zabbix_hosts() + + def set_config_option(self, option: str, value: OptionValue) -> bool: + if option not in self.config_keys.keys(): + raise RuntimeError('{0} is a unknown configuration ' + 'option'.format(option)) + + if option in ['zabbix_port', 'interval', 'discovery_interval']: + try: + int_value = int(value) # type: ignore + except (ValueError, TypeError): + raise RuntimeError('invalid {0} configured. Please specify ' + 'a valid integer'.format(option)) + + if option == 'interval' and int_value < 10: + raise RuntimeError('interval should be set to at least 10 seconds') + + if option == 'discovery_interval' and int_value < 10: + raise RuntimeError( + "discovery_interval should not be more frequent " + "than once in 10 regular data collection" + ) + + self.log.debug('Setting in-memory config option %s to: %s', option, + value) + self.config[option] = value + return True + + def _parse_zabbix_hosts(self) -> None: + self._zabbix_hosts = list() + servers = cast(str, self.config['zabbix_host']).split(",") + for server in servers: + uri = re.match("(?:(?:\[?)([a-z0-9-\.]+|[a-f0-9:\.]+)(?:\]?))(?:((?::))([0-9]{1,5}))?$", server) + if uri: + zabbix_host, sep, opt_zabbix_port = uri.groups() + if sep == ':': + zabbix_port = int(opt_zabbix_port) + else: + zabbix_port = cast(int, self.config['zabbix_port']) + self._zabbix_hosts.append({'zabbix_host': zabbix_host, 'zabbix_port': zabbix_port}) + else: + self.log.error('Zabbix host "%s" is not valid', server) + + self.log.error('Parsed Zabbix hosts: %s', self._zabbix_hosts) + + def get_pg_stats(self) -> Dict[str, int]: + stats = dict() + + pg_states = ['active', 'peering', 'clean', 'scrubbing', 'undersized', + 'backfilling', 'recovering', 'degraded', 'inconsistent', + 'remapped', 'backfill_toofull', 'backfill_wait', + 'recovery_wait'] + + for state in pg_states: + stats['num_pg_{0}'.format(state)] = 0 + + pg_status = self.get('pg_status') + + stats['num_pg'] = pg_status['num_pgs'] + + for state in pg_status['pgs_by_state']: + states = state['state_name'].split('+') + for s in pg_states: + key = 'num_pg_{0}'.format(s) + if s in states: + stats[key] += state['count'] + + return stats + + def get_data(self) -> Dict[str, Union[int, float]]: + data = dict() + + health = json.loads(self.get('health')['json']) + # 'status' is luminous+, 'overall_status' is legacy mode. + data['overall_status'] = health.get('status', + health.get('overall_status')) + data['overall_status_int'] = \ + self.ceph_health_mapping.get(data['overall_status']) + + mon_status = json.loads(self.get('mon_status')['json']) + data['num_mon'] = len(mon_status['monmap']['mons']) + + df = self.get('df') + data['num_pools'] = len(df['pools']) + data['total_used_bytes'] = df['stats']['total_used_bytes'] + data['total_bytes'] = df['stats']['total_bytes'] + data['total_avail_bytes'] = df['stats']['total_avail_bytes'] + + wr_ops = 0 + rd_ops = 0 + wr_bytes = 0 + rd_bytes = 0 + + for pool in df['pools']: + wr_ops += pool['stats']['wr'] + rd_ops += pool['stats']['rd'] + wr_bytes += pool['stats']['wr_bytes'] + rd_bytes += pool['stats']['rd_bytes'] + data['[{0},rd_bytes]'.format(pool['name'])] = pool['stats']['rd_bytes'] + data['[{0},wr_bytes]'.format(pool['name'])] = pool['stats']['wr_bytes'] + data['[{0},rd_ops]'.format(pool['name'])] = pool['stats']['rd'] + data['[{0},wr_ops]'.format(pool['name'])] = pool['stats']['wr'] + data['[{0},bytes_used]'.format(pool['name'])] = pool['stats']['bytes_used'] + data['[{0},stored_raw]'.format(pool['name'])] = pool['stats']['stored_raw'] + data['[{0},percent_used]'.format(pool['name'])] = pool['stats']['percent_used'] * 100 + + data['wr_ops'] = wr_ops + data['rd_ops'] = rd_ops + data['wr_bytes'] = wr_bytes + data['rd_bytes'] = rd_bytes + + osd_map = self.get('osd_map') + data['num_osd'] = len(osd_map['osds']) + data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] + data['osd_full_ratio'] = osd_map['full_ratio'] + data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] + + data['num_pg_temp'] = len(osd_map['pg_temp']) + + num_up = 0 + num_in = 0 + for osd in osd_map['osds']: + data['[osd.{0},up]'.format(int(osd['osd']))] = osd['up'] + if osd['up'] == 1: + num_up += 1 + + data['[osd.{0},in]'.format(int(osd['osd']))] = osd['in'] + if osd['in'] == 1: + num_in += 1 + + data['num_osd_up'] = num_up + data['num_osd_in'] = num_in + + osd_fill = list() + osd_pgs = list() + osd_apply_latency_ns = list() + osd_commit_latency_ns = list() + + osd_stats = self.get('osd_stats') + for osd in osd_stats['osd_stats']: + try: + osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) + data['[osd.{0},osd_fill]'.format(osd['osd'])] = ( + float(osd['kb_used']) / float(osd['kb'])) * 100 + except ZeroDivisionError: + continue + osd_pgs.append(osd['num_pgs']) + osd_apply_latency_ns.append(osd['perf_stat']['apply_latency_ns']) + osd_commit_latency_ns.append(osd['perf_stat']['commit_latency_ns']) + data['[osd.{0},num_pgs]'.format(osd['osd'])] = osd['num_pgs'] + data[ + '[osd.{0},osd_latency_apply]'.format(osd['osd']) + ] = osd['perf_stat']['apply_latency_ns'] / 1000000.0 # ns -> ms + data[ + '[osd.{0},osd_latency_commit]'.format(osd['osd']) + ] = osd['perf_stat']['commit_latency_ns'] / 1000000.0 # ns -> ms + + try: + data['osd_max_fill'] = max(osd_fill) + data['osd_min_fill'] = min(osd_fill) + data['osd_avg_fill'] = avg(osd_fill) + data['osd_max_pgs'] = max(osd_pgs) + data['osd_min_pgs'] = min(osd_pgs) + data['osd_avg_pgs'] = avg(osd_pgs) + except ValueError: + pass + + try: + data['osd_latency_apply_max'] = max(osd_apply_latency_ns) / 1000000.0 # ns -> ms + data['osd_latency_apply_min'] = min(osd_apply_latency_ns) / 1000000.0 # ns -> ms + data['osd_latency_apply_avg'] = avg(osd_apply_latency_ns) / 1000000.0 # ns -> ms + + data['osd_latency_commit_max'] = max(osd_commit_latency_ns) / 1000000.0 # ns -> ms + data['osd_latency_commit_min'] = min(osd_commit_latency_ns) / 1000000.0 # ns -> ms + data['osd_latency_commit_avg'] = avg(osd_commit_latency_ns) / 1000000.0 # ns -> ms + except ValueError: + pass + + data.update(self.get_pg_stats()) + + return data + + def send(self, data: Mapping[str, Union[int, float, str]]) -> bool: + identifier = cast(Optional[str], self.config['identifier']) + if identifier is None or len(identifier) == 0: + identifier = 'ceph-{0}'.format(self.fsid) + + if not self.config['zabbix_host'] or not self._zabbix_hosts: + self.log.error('Zabbix server not set, please configure using: ' + 'ceph zabbix config-set zabbix_host ') + self.set_health_checks({ + 'MGR_ZABBIX_NO_SERVER': { + 'severity': 'warning', + 'summary': 'No Zabbix server configured', + 'detail': ['Configuration value zabbix_host not configured'] + } + }) + return False + + result = True + + for server in self._zabbix_hosts: + self.log.info( + 'Sending data to Zabbix server %s, port %s as host/identifier %s', + server['zabbix_host'], server['zabbix_port'], identifier) + self.log.debug(data) + + try: + zabbix = ZabbixSender(cast(str, self.config['zabbix_sender']), + cast(str, server['zabbix_host']), + cast(int, server['zabbix_port']), self.log) + zabbix.send(identifier, data) + except Exception as exc: + self.log.exception('Failed to send.') + self.set_health_checks({ + 'MGR_ZABBIX_SEND_FAILED': { + 'severity': 'warning', + 'summary': 'Failed to send data to Zabbix', + 'detail': [str(exc)] + } + }) + result = False + + self.set_health_checks(dict()) + return result + + def discovery(self) -> bool: + osd_map = self.get('osd_map') + osd_map_crush = self.get('osd_map_crush') + + # Discovering ceph pools + pool_discovery = { + pool['pool_name']: step['item_name'] + for pool in osd_map['pools'] + for rule in osd_map_crush['rules'] if rule['rule_id'] == pool['crush_rule'] + for step in rule['steps'] if step['op'] == "take" + } + pools_discovery_data = {"data": [ + { + "{#POOL}": pool, + "{#CRUSH_RULE}": rule + } + for pool, rule in pool_discovery.items() + ]} + + # Discovering OSDs + # Getting hosts for found crush rules + osd_roots = { + step['item_name']: [ + item['id'] + for item in root_bucket['items'] + ] + for rule in osd_map_crush['rules'] + for step in rule['steps'] if step['op'] == "take" + for root_bucket in osd_map_crush['buckets'] + if root_bucket['id'] == step['item'] + } + # Getting osds for hosts with map to crush_rule + osd_discovery = { + item['id']: crush_rule + for crush_rule, roots in osd_roots.items() + for root in roots + for bucket in osd_map_crush['buckets'] + if bucket['id'] == root + for item in bucket['items'] + } + osd_discovery_data = {"data": [ + { + "{#OSD}": osd, + "{#CRUSH_RULE}": rule + } + for osd, rule in osd_discovery.items() + ]} + # Preparing recieved data for sending + data = { + "zabbix.pool.discovery": json.dumps(pools_discovery_data), + "zabbix.osd.discovery": json.dumps(osd_discovery_data) + } + return bool(self.send(data)) + + @CLIReadCommand('zabbix config-show') + def config_show(self) -> Tuple[int, str, str]: + """ + Show current configuration + """ + return 0, json.dumps(self.config, indent=4, sort_keys=True), '' + + @CLIWriteCommand('zabbix config-set') + def config_set(self, key: str, value: str) -> Tuple[int, str, str]: + """ + Set a configuration value + """ + if not value: + return -errno.EINVAL, '', 'Value should not be empty or None' + + self.log.debug('Setting configuration option %s to %s', key, value) + if self.set_config_option(key, value): + self.set_module_option(key, value) + if key == 'zabbix_host' or key == 'zabbix_port': + self._parse_zabbix_hosts() + return 0, 'Configuration option {0} updated'.format(key), '' + return 1,\ + 'Failed to update configuration option {0}'.format(key), '' + + @CLIReadCommand('zabbix send') + def do_send(self) -> Tuple[int, str, str]: + """ + Force sending data to Zabbix + """ + data = self.get_data() + if self.send(data): + return 0, 'Sending data to Zabbix', '' + + return 1, 'Failed to send data to Zabbix', '' + + @CLIReadCommand('zabbix discovery') + def do_discovery(self) -> Tuple[int, str, str]: + """ + Discovering Zabbix data + """ + if self.discovery(): + return 0, 'Sending discovery data to Zabbix', '' + + return 1, 'Failed to send discovery data to Zabbix', '' + + def shutdown(self) -> None: + self.log.info('Stopping zabbix') + self.run = False + self.event.set() + + def serve(self) -> None: + self.log.info('Zabbix module starting up') + self.run = True + + self.init_module_config() + + discovery_interval = self.config['discovery_interval'] + # We are sending discovery once plugin is loaded + discovery_counter = cast(int, discovery_interval) + while self.run: + self.log.debug('Waking up for new iteration') + + if discovery_counter == discovery_interval: + try: + self.discovery() + except Exception: + # Shouldn't happen, but let's log it and retry next interval, + # rather than dying completely. + self.log.exception("Unexpected error during discovery():") + finally: + discovery_counter = 0 + + try: + data = self.get_data() + self.send(data) + except Exception: + # Shouldn't happen, but let's log it and retry next interval, + # rather than dying completely. + self.log.exception("Unexpected error during send():") + + interval = cast(float, self.config['interval']) + self.log.debug('Sleeping for %d seconds', interval) + discovery_counter += 1 + self.event.wait(interval) + + def self_test(self) -> None: + data = self.get_data() + + if data['overall_status'] not in self.ceph_health_mapping: + raise RuntimeError('No valid overall_status found in data') + + int(data['overall_status_int']) + + if data['num_mon'] < 1: + raise RuntimeError('num_mon is smaller than 1') -- cgit v1.2.3