diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/telegraf/__init__.py | 1 | ||||
-rw-r--r-- | src/pybind/mgr/telegraf/basesocket.py | 49 | ||||
-rw-r--r-- | src/pybind/mgr/telegraf/module.py | 283 | ||||
-rw-r--r-- | src/pybind/mgr/telegraf/protocol.py | 50 | ||||
-rw-r--r-- | src/pybind/mgr/telegraf/utils.py | 26 |
5 files changed, 409 insertions, 0 deletions
diff --git a/src/pybind/mgr/telegraf/__init__.py b/src/pybind/mgr/telegraf/__init__.py new file mode 100644 index 000000000..8f210ac92 --- /dev/null +++ b/src/pybind/mgr/telegraf/__init__.py @@ -0,0 +1 @@ +from .module import Module diff --git a/src/pybind/mgr/telegraf/basesocket.py b/src/pybind/mgr/telegraf/basesocket.py new file mode 100644 index 000000000..5caea3be7 --- /dev/null +++ b/src/pybind/mgr/telegraf/basesocket.py @@ -0,0 +1,49 @@ +import socket +from urllib.parse import ParseResult +from typing import Any, Dict, Optional, Tuple, Union + + +class BaseSocket(object): + schemes = { + 'unixgram': (socket.AF_UNIX, socket.SOCK_DGRAM), + 'unix': (socket.AF_UNIX, socket.SOCK_STREAM), + 'tcp': (socket.AF_INET, socket.SOCK_STREAM), + 'tcp6': (socket.AF_INET6, socket.SOCK_STREAM), + 'udp': (socket.AF_INET, socket.SOCK_DGRAM), + 'udp6': (socket.AF_INET6, socket.SOCK_DGRAM), + } + + def __init__(self, url: ParseResult) -> None: + self.url = url + + try: + socket_family, socket_type = self.schemes[self.url.scheme] + except KeyError: + raise RuntimeError('Unsupported socket type: %s', self.url.scheme) + + self.sock = socket.socket(family=socket_family, type=socket_type) + if self.sock.family == socket.AF_UNIX: + self.address: Union[str, Tuple[str, int]] = self.url.path + else: + assert self.url.hostname + assert self.url.port + self.address = (self.url.hostname, self.url.port) + + def connect(self) -> None: + return self.sock.connect(self.address) + + def close(self) -> None: + self.sock.close() + + def send(self, data: str, flags: int = 0) -> int: + return self.sock.send(data.encode('utf-8') + b'\n', flags) + + def __del__(self) -> None: + self.sock.close() + + def __enter__(self) -> 'BaseSocket': + self.connect() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.close() diff --git a/src/pybind/mgr/telegraf/module.py b/src/pybind/mgr/telegraf/module.py new file mode 100644 index 000000000..541ddba4f --- /dev/null +++ b/src/pybind/mgr/telegraf/module.py @@ -0,0 +1,283 @@ +import errno +import json +import itertools +import socket +import time +from threading import Event + +from telegraf.basesocket import BaseSocket +from telegraf.protocol import Line +from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES + +from typing import cast, Any, Dict, Iterable, Optional, Tuple +from urllib.parse import urlparse + + +class Module(MgrModule): + MODULE_OPTIONS = [ + Option(name='address', + default='unixgram:///tmp/telegraf.sock'), + Option(name='interval', + type='secs', + default=15)] + + ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} + + @property + def config_keys(self) -> Dict[str, OptionValue]: + return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + self.run = True + self.fsid: Optional[str] = None + self.config: Dict[str, OptionValue] = dict() + + def get_fsid(self) -> str: + if not self.fsid: + self.fsid = self.get('mon_map')['fsid'] + assert self.fsid is not None + return self.fsid + + def get_pool_stats(self) -> Iterable[Dict[str, Any]]: + df = self.get('df') + + df_types = [ + 'bytes_used', + 'kb_used', + 'dirty', + 'rd', + 'rd_bytes', + 'stored_raw', + 'wr', + 'wr_bytes', + 'objects', + 'max_avail', + 'quota_objects', + 'quota_bytes' + ] + + for df_type in df_types: + for pool in df['pools']: + yield { + 'measurement': 'ceph_pool_stats', + 'tags': { + 'pool_name': pool['name'], + 'pool_id': pool['id'], + 'type_instance': df_type, + 'fsid': self.get_fsid() + }, + 'value': pool['stats'][df_type], + } + + def get_daemon_stats(self) -> Iterable[Dict[str, Any]]: + for daemon, counters in self.get_unlabeled_perf_counters().items(): + svc_type, svc_id = daemon.split('.', 1) + metadata = self.get_metadata(svc_type, svc_id) + if not metadata: + continue + + for path, counter_info in counters.items(): + if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: + continue + + yield { + 'measurement': 'ceph_daemon_stats', + 'tags': { + 'ceph_daemon': daemon, + 'type_instance': path, + 'host': metadata['hostname'], + 'fsid': self.get_fsid() + }, + 'value': counter_info['value'] + } + + def get_pg_stats(self) -> Dict[str, int]: + stats = dict() + + pg_status = self.get('pg_status') + for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail', + 'num_pgs', 'num_objects', 'num_pools']: + stats[key] = pg_status[key] + + for state in PG_STATES: + stats['num_pgs_{0}'.format(state)] = 0 + + stats['num_pgs'] = pg_status['num_pgs'] + for state in pg_status['pgs_by_state']: + states = state['state_name'].split('+') + for s in PG_STATES: + key = 'num_pgs_{0}'.format(s) + if s in states: + stats[key] += state['count'] + + return stats + + def get_cluster_stats(self) -> Iterable[Dict[str, Any]]: + stats = dict() + + health = json.loads(self.get('health')['json']) + stats['health'] = self.ceph_health_mapping.get(health['status']) + + mon_status = json.loads(self.get('mon_status')['json']) + stats['num_mon'] = len(mon_status['monmap']['mons']) + + stats['mon_election_epoch'] = mon_status['election_epoch'] + stats['mon_outside_quorum'] = len(mon_status['outside_quorum']) + stats['mon_quorum'] = len(mon_status['quorum']) + + osd_map = self.get('osd_map') + stats['num_osd'] = len(osd_map['osds']) + stats['num_pg_temp'] = len(osd_map['pg_temp']) + stats['osd_epoch'] = osd_map['epoch'] + + mgr_map = self.get('mgr_map') + stats['mgr_available'] = int(mgr_map['available']) + stats['num_mgr_standby'] = len(mgr_map['standbys']) + stats['mgr_epoch'] = mgr_map['epoch'] + + num_up = 0 + num_in = 0 + for osd in osd_map['osds']: + if osd['up'] == 1: + num_up += 1 + + if osd['in'] == 1: + num_in += 1 + + stats['num_osd_up'] = num_up + stats['num_osd_in'] = num_in + + fs_map = self.get('fs_map') + stats['num_mds_standby'] = len(fs_map['standbys']) + stats['num_fs'] = len(fs_map['filesystems']) + stats['mds_epoch'] = fs_map['epoch'] + + num_mds_up = 0 + for fs in fs_map['filesystems']: + num_mds_up += len(fs['mdsmap']['up']) + + stats['num_mds_up'] = num_mds_up + stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby']) + + stats.update(self.get_pg_stats()) + + for key, value in stats.items(): + assert value is not None + yield { + 'measurement': 'ceph_cluster_stats', + 'tags': { + 'type_instance': key, + 'fsid': self.get_fsid() + }, + 'value': int(value) + } + + def set_config_option(self, option: str, value: str) -> None: + if option not in self.config_keys.keys(): + raise RuntimeError('{0} is a unknown configuration ' + 'option'.format(option)) + + if option == 'interval': + try: + interval = int(value) + except (ValueError, TypeError): + raise RuntimeError('invalid {0} configured. Please specify ' + 'a valid integer'.format(option)) + if interval < 5: + raise RuntimeError('interval should be set to at least 5 seconds') + self.config[option] = interval + else: + self.config[option] = value + + def init_module_config(self) -> None: + self.config['address'] = \ + self.get_module_option("address", default=self.config_keys['address']) + interval = self.get_module_option("interval", + default=self.config_keys['interval']) + assert interval + self.config['interval'] = int(interval) + + def now(self) -> int: + return int(round(time.time() * 1000000000)) + + def gather_measurements(self) -> Iterable[Dict[str, Any]]: + return itertools.chain( + self.get_pool_stats(), + self.get_daemon_stats(), + self.get_cluster_stats() + ) + + def send_to_telegraf(self) -> None: + url = urlparse(cast(str, self.config['address'])) + + sock = BaseSocket(url) + self.log.debug('Sending data to Telegraf at %s', sock.address) + now = self.now() + try: + with sock as s: + for measurement in self.gather_measurements(): + self.log.debug(measurement) + line = Line(measurement['measurement'], + measurement['value'], + measurement['tags'], now) + self.log.debug(line.to_line_protocol()) + s.send(line.to_line_protocol()) + except (socket.error, RuntimeError, IOError, OSError): + self.log.exception('Failed to send statistics to Telegraf:') + except FileNotFoundError: + self.log.exception('Failed to open Telegraf at: %s', url.geturl()) + + def shutdown(self) -> None: + self.log.info('Stopping Telegraf module') + self.run = False + self.event.set() + + @CLIReadCommand('telegraf config-show') + def config_show(self) -> Tuple[int, str, str]: + """ + Show current configuration + """ + return 0, json.dumps(self.config), '' + + @CLICommand('telegraf config-set') + def config_set(self, key: str, value: str) -> Tuple[int, str, str]: + """ + Set a configuration value + """ + if not value: + return -errno.EINVAL, '', 'Value should not be empty or None' + self.log.debug('Setting configuration option %s to %s', key, value) + self.set_config_option(key, value) + self.set_module_option(key, value) + return 0, 'Configuration option {0} updated'.format(key), '' + + @CLICommand('telegraf send') + def send(self) -> Tuple[int, str, str]: + """ + Force sending data to Telegraf + """ + self.send_to_telegraf() + return 0, 'Sending data to Telegraf', '' + + def self_test(self) -> None: + measurements = list(self.gather_measurements()) + if len(measurements) == 0: + raise RuntimeError('No measurements found') + + def serve(self) -> None: + self.log.info('Starting Telegraf module') + self.init_module_config() + self.run = True + + self.log.debug('Waiting 10 seconds before starting') + self.event.wait(10) + + while self.run: + start = self.now() + self.send_to_telegraf() + runtime = (self.now() - start) / 1000000 + self.log.debug('Sending data to Telegraf took %d ms', runtime) + self.log.debug("Sleeping for %d seconds", self.config['interval']) + self.event.wait(cast(int, self.config['interval'])) diff --git a/src/pybind/mgr/telegraf/protocol.py b/src/pybind/mgr/telegraf/protocol.py new file mode 100644 index 000000000..7cf8bbe9e --- /dev/null +++ b/src/pybind/mgr/telegraf/protocol.py @@ -0,0 +1,50 @@ +from typing import Dict, Optional, Union + +from telegraf.utils import format_string, format_value, ValueType + + +class Line(object): + def __init__(self, + measurement: ValueType, + values: Union[Dict[str, ValueType], ValueType], + tags: Optional[Dict[str, str]] = None, + timestamp: Optional[int] = None) -> None: + self.measurement = measurement + self.values = values + self.tags = tags + self.timestamp = timestamp + + def get_output_measurement(self) -> str: + return format_string(self.measurement) + + def get_output_values(self) -> str: + if not isinstance(self.values, dict): + metric_values = {'value': self.values} + else: + metric_values = self.values + + sorted_values = sorted(metric_values.items()) + sorted_values = [(k, v) for k, v in sorted_values if v is not None] + + return ','.join('{0}={1}'.format(format_string(k), format_value(v)) for k, v in sorted_values) + + def get_output_tags(self) -> str: + if not self.tags: + self.tags = dict() + + sorted_tags = sorted(self.tags.items()) + + return ','.join('{0}={1}'.format(format_string(k), format_string(v)) for k, v in sorted_tags) + + def get_output_timestamp(self) -> str: + return ' {0}'.format(self.timestamp) if self.timestamp else '' + + def to_line_protocol(self) -> str: + tags = self.get_output_tags() + + return '{0}{1} {2}{3}'.format( + self.get_output_measurement(), + "," + tags if tags else '', + self.get_output_values(), + self.get_output_timestamp() + ) diff --git a/src/pybind/mgr/telegraf/utils.py b/src/pybind/mgr/telegraf/utils.py new file mode 100644 index 000000000..783e9edc7 --- /dev/null +++ b/src/pybind/mgr/telegraf/utils.py @@ -0,0 +1,26 @@ +from typing import Union + +ValueType = Union[str, bool, int, float] + + +def format_string(key: ValueType) -> str: + if isinstance(key, str): + return key.replace(',', r'\,') \ + .replace(' ', r'\ ') \ + .replace('=', r'\=') + else: + return str(key) + + +def format_value(value: ValueType) -> str: + if isinstance(value, str): + value = value.replace('"', '\"') + return f'"{value}"' + elif isinstance(value, bool): + return str(value) + elif isinstance(value, int): + return f"{value}i" + elif isinstance(value, float): + return str(value) + else: + raise ValueError() |