summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/telegraf
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/telegraf/__init__.py1
-rw-r--r--src/pybind/mgr/telegraf/basesocket.py49
-rw-r--r--src/pybind/mgr/telegraf/module.py283
-rw-r--r--src/pybind/mgr/telegraf/protocol.py50
-rw-r--r--src/pybind/mgr/telegraf/utils.py26
5 files changed, 409 insertions, 0 deletions
diff --git a/src/pybind/mgr/telegraf/__init__.py b/src/pybind/mgr/telegraf/__init__.py
new file mode 100644
index 000000000..8f210ac92
--- /dev/null
+++ b/src/pybind/mgr/telegraf/__init__.py
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/telegraf/basesocket.py b/src/pybind/mgr/telegraf/basesocket.py
new file mode 100644
index 000000000..5caea3be7
--- /dev/null
+++ b/src/pybind/mgr/telegraf/basesocket.py
@@ -0,0 +1,49 @@
+import socket
+from urllib.parse import ParseResult
+from typing import Any, Dict, Optional, Tuple, Union
+
+
+class BaseSocket(object):
+ schemes = {
+ 'unixgram': (socket.AF_UNIX, socket.SOCK_DGRAM),
+ 'unix': (socket.AF_UNIX, socket.SOCK_STREAM),
+ 'tcp': (socket.AF_INET, socket.SOCK_STREAM),
+ 'tcp6': (socket.AF_INET6, socket.SOCK_STREAM),
+ 'udp': (socket.AF_INET, socket.SOCK_DGRAM),
+ 'udp6': (socket.AF_INET6, socket.SOCK_DGRAM),
+ }
+
+ def __init__(self, url: ParseResult) -> None:
+ self.url = url
+
+ try:
+ socket_family, socket_type = self.schemes[self.url.scheme]
+ except KeyError:
+ raise RuntimeError('Unsupported socket type: %s', self.url.scheme)
+
+ self.sock = socket.socket(family=socket_family, type=socket_type)
+ if self.sock.family == socket.AF_UNIX:
+ self.address: Union[str, Tuple[str, int]] = self.url.path
+ else:
+ assert self.url.hostname
+ assert self.url.port
+ self.address = (self.url.hostname, self.url.port)
+
+ def connect(self) -> None:
+ return self.sock.connect(self.address)
+
+ def close(self) -> None:
+ self.sock.close()
+
+ def send(self, data: str, flags: int = 0) -> int:
+ return self.sock.send(data.encode('utf-8') + b'\n', flags)
+
+ def __del__(self) -> None:
+ self.sock.close()
+
+ def __enter__(self) -> 'BaseSocket':
+ self.connect()
+ return self
+
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ self.close()
diff --git a/src/pybind/mgr/telegraf/module.py b/src/pybind/mgr/telegraf/module.py
new file mode 100644
index 000000000..541ddba4f
--- /dev/null
+++ b/src/pybind/mgr/telegraf/module.py
@@ -0,0 +1,283 @@
+import errno
+import json
+import itertools
+import socket
+import time
+from threading import Event
+
+from telegraf.basesocket import BaseSocket
+from telegraf.protocol import Line
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES
+
+from typing import cast, Any, Dict, Iterable, Optional, Tuple
+from urllib.parse import urlparse
+
+
+class Module(MgrModule):
+ MODULE_OPTIONS = [
+ Option(name='address',
+ default='unixgram:///tmp/telegraf.sock'),
+ Option(name='interval',
+ type='secs',
+ default=15)]
+
+ ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
+
+ @property
+ def config_keys(self) -> Dict[str, OptionValue]:
+ return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = True
+ self.fsid: Optional[str] = None
+ self.config: Dict[str, OptionValue] = dict()
+
+ def get_fsid(self) -> str:
+ if not self.fsid:
+ self.fsid = self.get('mon_map')['fsid']
+ assert self.fsid is not None
+ return self.fsid
+
+ def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
+ df = self.get('df')
+
+ df_types = [
+ 'bytes_used',
+ 'kb_used',
+ 'dirty',
+ 'rd',
+ 'rd_bytes',
+ 'stored_raw',
+ 'wr',
+ 'wr_bytes',
+ 'objects',
+ 'max_avail',
+ 'quota_objects',
+ 'quota_bytes'
+ ]
+
+ for df_type in df_types:
+ for pool in df['pools']:
+ yield {
+ 'measurement': 'ceph_pool_stats',
+ 'tags': {
+ 'pool_name': pool['name'],
+ 'pool_id': pool['id'],
+ 'type_instance': df_type,
+ 'fsid': self.get_fsid()
+ },
+ 'value': pool['stats'][df_type],
+ }
+
+ def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
+ for daemon, counters in self.get_unlabeled_perf_counters().items():
+ svc_type, svc_id = daemon.split('.', 1)
+ metadata = self.get_metadata(svc_type, svc_id)
+ if not metadata:
+ continue
+
+ for path, counter_info in counters.items():
+ if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
+ continue
+
+ yield {
+ 'measurement': 'ceph_daemon_stats',
+ 'tags': {
+ 'ceph_daemon': daemon,
+ 'type_instance': path,
+ 'host': metadata['hostname'],
+ 'fsid': self.get_fsid()
+ },
+ 'value': counter_info['value']
+ }
+
+ def get_pg_stats(self) -> Dict[str, int]:
+ stats = dict()
+
+ pg_status = self.get('pg_status')
+ for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail',
+ 'num_pgs', 'num_objects', 'num_pools']:
+ stats[key] = pg_status[key]
+
+ for state in PG_STATES:
+ stats['num_pgs_{0}'.format(state)] = 0
+
+ stats['num_pgs'] = pg_status['num_pgs']
+ for state in pg_status['pgs_by_state']:
+ states = state['state_name'].split('+')
+ for s in PG_STATES:
+ key = 'num_pgs_{0}'.format(s)
+ if s in states:
+ stats[key] += state['count']
+
+ return stats
+
+ def get_cluster_stats(self) -> Iterable[Dict[str, Any]]:
+ stats = dict()
+
+ health = json.loads(self.get('health')['json'])
+ stats['health'] = self.ceph_health_mapping.get(health['status'])
+
+ mon_status = json.loads(self.get('mon_status')['json'])
+ stats['num_mon'] = len(mon_status['monmap']['mons'])
+
+ stats['mon_election_epoch'] = mon_status['election_epoch']
+ stats['mon_outside_quorum'] = len(mon_status['outside_quorum'])
+ stats['mon_quorum'] = len(mon_status['quorum'])
+
+ osd_map = self.get('osd_map')
+ stats['num_osd'] = len(osd_map['osds'])
+ stats['num_pg_temp'] = len(osd_map['pg_temp'])
+ stats['osd_epoch'] = osd_map['epoch']
+
+ mgr_map = self.get('mgr_map')
+ stats['mgr_available'] = int(mgr_map['available'])
+ stats['num_mgr_standby'] = len(mgr_map['standbys'])
+ stats['mgr_epoch'] = mgr_map['epoch']
+
+ num_up = 0
+ num_in = 0
+ for osd in osd_map['osds']:
+ if osd['up'] == 1:
+ num_up += 1
+
+ if osd['in'] == 1:
+ num_in += 1
+
+ stats['num_osd_up'] = num_up
+ stats['num_osd_in'] = num_in
+
+ fs_map = self.get('fs_map')
+ stats['num_mds_standby'] = len(fs_map['standbys'])
+ stats['num_fs'] = len(fs_map['filesystems'])
+ stats['mds_epoch'] = fs_map['epoch']
+
+ num_mds_up = 0
+ for fs in fs_map['filesystems']:
+ num_mds_up += len(fs['mdsmap']['up'])
+
+ stats['num_mds_up'] = num_mds_up
+ stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby'])
+
+ stats.update(self.get_pg_stats())
+
+ for key, value in stats.items():
+ assert value is not None
+ yield {
+ 'measurement': 'ceph_cluster_stats',
+ 'tags': {
+ 'type_instance': key,
+ 'fsid': self.get_fsid()
+ },
+ 'value': int(value)
+ }
+
+ def set_config_option(self, option: str, value: str) -> None:
+ if option not in self.config_keys.keys():
+ raise RuntimeError('{0} is a unknown configuration '
+ 'option'.format(option))
+
+ if option == 'interval':
+ try:
+ interval = int(value)
+ except (ValueError, TypeError):
+ raise RuntimeError('invalid {0} configured. Please specify '
+ 'a valid integer'.format(option))
+ if interval < 5:
+ raise RuntimeError('interval should be set to at least 5 seconds')
+ self.config[option] = interval
+ else:
+ self.config[option] = value
+
+ def init_module_config(self) -> None:
+ self.config['address'] = \
+ self.get_module_option("address", default=self.config_keys['address'])
+ interval = self.get_module_option("interval",
+ default=self.config_keys['interval'])
+ assert interval
+ self.config['interval'] = int(interval)
+
+ def now(self) -> int:
+ return int(round(time.time() * 1000000000))
+
+ def gather_measurements(self) -> Iterable[Dict[str, Any]]:
+ return itertools.chain(
+ self.get_pool_stats(),
+ self.get_daemon_stats(),
+ self.get_cluster_stats()
+ )
+
+ def send_to_telegraf(self) -> None:
+ url = urlparse(cast(str, self.config['address']))
+
+ sock = BaseSocket(url)
+ self.log.debug('Sending data to Telegraf at %s', sock.address)
+ now = self.now()
+ try:
+ with sock as s:
+ for measurement in self.gather_measurements():
+ self.log.debug(measurement)
+ line = Line(measurement['measurement'],
+ measurement['value'],
+ measurement['tags'], now)
+ self.log.debug(line.to_line_protocol())
+ s.send(line.to_line_protocol())
+ except (socket.error, RuntimeError, IOError, OSError):
+ self.log.exception('Failed to send statistics to Telegraf:')
+ except FileNotFoundError:
+ self.log.exception('Failed to open Telegraf at: %s', url.geturl())
+
+ def shutdown(self) -> None:
+ self.log.info('Stopping Telegraf module')
+ self.run = False
+ self.event.set()
+
+ @CLIReadCommand('telegraf config-show')
+ def config_show(self) -> Tuple[int, str, str]:
+ """
+ Show current configuration
+ """
+ return 0, json.dumps(self.config), ''
+
+ @CLICommand('telegraf config-set')
+ def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
+ """
+ Set a configuration value
+ """
+ if not value:
+ return -errno.EINVAL, '', 'Value should not be empty or None'
+ self.log.debug('Setting configuration option %s to %s', key, value)
+ self.set_config_option(key, value)
+ self.set_module_option(key, value)
+ return 0, 'Configuration option {0} updated'.format(key), ''
+
+ @CLICommand('telegraf send')
+ def send(self) -> Tuple[int, str, str]:
+ """
+ Force sending data to Telegraf
+ """
+ self.send_to_telegraf()
+ return 0, 'Sending data to Telegraf', ''
+
+ def self_test(self) -> None:
+ measurements = list(self.gather_measurements())
+ if len(measurements) == 0:
+ raise RuntimeError('No measurements found')
+
+ def serve(self) -> None:
+ self.log.info('Starting Telegraf module')
+ self.init_module_config()
+ self.run = True
+
+ self.log.debug('Waiting 10 seconds before starting')
+ self.event.wait(10)
+
+ while self.run:
+ start = self.now()
+ self.send_to_telegraf()
+ runtime = (self.now() - start) / 1000000
+ self.log.debug('Sending data to Telegraf took %d ms', runtime)
+ self.log.debug("Sleeping for %d seconds", self.config['interval'])
+ self.event.wait(cast(int, self.config['interval']))
diff --git a/src/pybind/mgr/telegraf/protocol.py b/src/pybind/mgr/telegraf/protocol.py
new file mode 100644
index 000000000..7cf8bbe9e
--- /dev/null
+++ b/src/pybind/mgr/telegraf/protocol.py
@@ -0,0 +1,50 @@
+from typing import Dict, Optional, Union
+
+from telegraf.utils import format_string, format_value, ValueType
+
+
+class Line(object):
+ def __init__(self,
+ measurement: ValueType,
+ values: Union[Dict[str, ValueType], ValueType],
+ tags: Optional[Dict[str, str]] = None,
+ timestamp: Optional[int] = None) -> None:
+ self.measurement = measurement
+ self.values = values
+ self.tags = tags
+ self.timestamp = timestamp
+
+ def get_output_measurement(self) -> str:
+ return format_string(self.measurement)
+
+ def get_output_values(self) -> str:
+ if not isinstance(self.values, dict):
+ metric_values = {'value': self.values}
+ else:
+ metric_values = self.values
+
+ sorted_values = sorted(metric_values.items())
+ sorted_values = [(k, v) for k, v in sorted_values if v is not None]
+
+ return ','.join('{0}={1}'.format(format_string(k), format_value(v)) for k, v in sorted_values)
+
+ def get_output_tags(self) -> str:
+ if not self.tags:
+ self.tags = dict()
+
+ sorted_tags = sorted(self.tags.items())
+
+ return ','.join('{0}={1}'.format(format_string(k), format_string(v)) for k, v in sorted_tags)
+
+ def get_output_timestamp(self) -> str:
+ return ' {0}'.format(self.timestamp) if self.timestamp else ''
+
+ def to_line_protocol(self) -> str:
+ tags = self.get_output_tags()
+
+ return '{0}{1} {2}{3}'.format(
+ self.get_output_measurement(),
+ "," + tags if tags else '',
+ self.get_output_values(),
+ self.get_output_timestamp()
+ )
diff --git a/src/pybind/mgr/telegraf/utils.py b/src/pybind/mgr/telegraf/utils.py
new file mode 100644
index 000000000..783e9edc7
--- /dev/null
+++ b/src/pybind/mgr/telegraf/utils.py
@@ -0,0 +1,26 @@
+from typing import Union
+
+ValueType = Union[str, bool, int, float]
+
+
+def format_string(key: ValueType) -> str:
+ if isinstance(key, str):
+ return key.replace(',', r'\,') \
+ .replace(' ', r'\ ') \
+ .replace('=', r'\=')
+ else:
+ return str(key)
+
+
+def format_value(value: ValueType) -> str:
+ if isinstance(value, str):
+ value = value.replace('"', '\"')
+ return f'"{value}"'
+ elif isinstance(value, bool):
+ return str(value)
+ elif isinstance(value, int):
+ return f"{value}i"
+ elif isinstance(value, float):
+ return str(value)
+ else:
+ raise ValueError()