summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/influx/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/influx/module.py')
-rw-r--r--src/pybind/mgr/influx/module.py481
1 files changed, 481 insertions, 0 deletions
diff --git a/src/pybind/mgr/influx/module.py b/src/pybind/mgr/influx/module.py
new file mode 100644
index 000000000..6818783b3
--- /dev/null
+++ b/src/pybind/mgr/influx/module.py
@@ -0,0 +1,481 @@
+from contextlib import contextmanager
+from datetime import datetime
+from threading import Event, Thread
+from itertools import chain
+import queue
+import json
+import errno
+import time
+from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
+
+from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
+
+try:
+ from influxdb import InfluxDBClient
+ from influxdb.exceptions import InfluxDBClientError
+ from requests.exceptions import RequestException
+except ImportError:
+ InfluxDBClient = None
+
+
+class Module(MgrModule):
+ MODULE_OPTIONS = [
+ Option(name='hostname',
+ default=None,
+ desc='InfluxDB server hostname'),
+ Option(name='port',
+ type='int',
+ default=8086,
+ desc='InfluxDB server port'),
+ Option(name='database',
+ default='ceph',
+ desc=('InfluxDB database name. You will need to create this '
+ 'database and grant write privileges to the configured '
+ 'username or the username must have admin privileges to '
+ 'create it.')),
+ Option(name='username',
+ default=None,
+ desc='username of InfluxDB server user'),
+ Option(name='password',
+ default=None,
+ desc='password of InfluxDB server user'),
+ Option(name='interval',
+ type='secs',
+ min=5,
+ default=30,
+ desc='Time between reports to InfluxDB. Default 30 seconds.'),
+ Option(name='ssl',
+ default='false',
+ desc='Use https connection for InfluxDB server. Use "true" or "false".'),
+ Option(name='verify_ssl',
+ default='true',
+ desc='Verify https cert for InfluxDB server. Use "true" or "false".'),
+ Option(name='threads',
+ type='int',
+ min=1,
+ max=32,
+ default=5,
+ desc='How many worker threads should be spawned for sending data to InfluxDB.'),
+ Option(name='batch_size',
+ type='int',
+ default=5000,
+ desc='How big batches of data points should be when sending to InfluxDB.'),
+ ]
+
+ @property
+ def config_keys(self) -> Dict[str, OptionValue]:
+ return dict((o['name'], o.get('default', None))
+ for o in self.MODULE_OPTIONS)
+
+ COMMANDS = [
+ {
+ "cmd": "influx config-set name=key,type=CephString "
+ "name=value,type=CephString",
+ "desc": "Set a configuration value",
+ "perm": "rw"
+ },
+ {
+ "cmd": "influx config-show",
+ "desc": "Show current configuration",
+ "perm": "r"
+ },
+ {
+ "cmd": "influx send",
+ "desc": "Force sending data to Influx",
+ "perm": "rw"
+ }
+ ]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = True
+ self.config: Dict[str, OptionValue] = dict()
+ self.workers: List[Thread] = list()
+ self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100)
+ self.health_checks: Dict[str, Dict[str, Any]] = dict()
+
+ def get_fsid(self) -> str:
+ return self.get('mon_map')['fsid']
+
+ @staticmethod
+ def can_run() -> Tuple[bool, str]:
+ if InfluxDBClient is not None:
+ return True, ""
+ else:
+ return False, "influxdb python module not found"
+
+ @staticmethod
+ def get_timestamp() -> str:
+ return datetime.utcnow().isoformat() + 'Z'
+
+ @staticmethod
+ def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]:
+ try:
+ while True:
+ xs = []
+ for _ in range(n):
+ xs.append(next(l))
+ yield xs
+ except StopIteration:
+ yield xs
+
+ def queue_worker(self) -> None:
+ while True:
+ try:
+ points = self.queue.get()
+ if not points:
+ self.log.debug('Worker shutting down')
+ break
+
+ start = time.time()
+ with self.get_influx_client() as client:
+ client.write_points(points, time_precision='ms')
+ runtime = time.time() - start
+ self.log.debug('Writing points %d to Influx took %.3f seconds',
+ len(points), runtime)
+ except RequestException as e:
+ hostname = self.config['hostname']
+ port = self.config['port']
+ self.log.exception(f"Failed to connect to Influx host {hostname}:{port}")
+ self.health_checks.update({
+ 'MGR_INFLUX_SEND_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to send data to InfluxDB server '
+ f'at {hostname}:{port} due to an connection error',
+ 'detail': [str(e)]
+ }
+ })
+ except InfluxDBClientError as e:
+ self.health_checks.update({
+ 'MGR_INFLUX_SEND_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to send data to InfluxDB',
+ 'detail': [str(e)]
+ }
+ })
+ self.log.exception('Failed to send data to InfluxDB')
+ except queue.Empty:
+ continue
+ except:
+ self.log.exception('Unhandled Exception while sending to Influx')
+ finally:
+ self.queue.task_done()
+
+ def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
+ data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+ if data:
+ return data[-1][1]
+
+ return 0
+
+ def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
+ df = self.get("df")
+ data = []
+ pool_info = {}
+
+ df_types = [
+ 'stored',
+ 'kb_used',
+ 'dirty',
+ 'rd',
+ 'rd_bytes',
+ 'stored_raw',
+ 'wr',
+ 'wr_bytes',
+ 'objects',
+ 'max_avail',
+ 'quota_objects',
+ 'quota_bytes'
+ ]
+
+ for df_type in df_types:
+ for pool in df['pools']:
+ point = {
+ "measurement": "ceph_pool_stats",
+ "tags": {
+ "pool_name": pool['name'],
+ "pool_id": pool['id'],
+ "type_instance": df_type,
+ "fsid": self.get_fsid()
+ },
+ "time": now,
+ "fields": {
+ "value": pool['stats'][df_type],
+ }
+ }
+ data.append(point)
+ pool_info.update({str(pool['id']):pool['name']})
+ return data, pool_info
+
+ def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
+ pg_sum = self.get('pg_summary')
+ osd_sum = pg_sum['by_osd']
+ for osd_id, stats in osd_sum.items():
+ metadata = self.get_metadata('osd', "%s" % osd_id)
+ if not metadata:
+ continue
+
+ for stat in stats:
+ yield {
+ "measurement": "ceph_pg_summary_osd",
+ "tags": {
+ "ceph_daemon": "osd." + str(osd_id),
+ "type_instance": stat,
+ "host": metadata['hostname']
+ },
+ "time" : now,
+ "fields" : {
+ "value": stats[stat]
+ }
+ }
+
+ def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
+ pool_sum = self.get('pg_summary')['by_pool']
+ for pool_id, stats in pool_sum.items():
+ try:
+ pool_name = pool_info[pool_id]
+ except KeyError:
+ self.log.error('Unable to find pool name for pool {}'.format(pool_id))
+ continue
+ for stat in stats:
+ yield {
+ "measurement": "ceph_pg_summary_pool",
+ "tags": {
+ "pool_name" : pool_name,
+ "pool_id" : pool_id,
+ "type_instance" : stat,
+ },
+ "time" : now,
+ "fields": {
+ "value" : stats[stat],
+ }
+ }
+
+ def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
+ for daemon, counters in self.get_unlabeled_perf_counters().items():
+ svc_type, svc_id = daemon.split(".", 1)
+ metadata = self.get_metadata(svc_type, svc_id)
+ if metadata is not None:
+ hostname = metadata['hostname']
+ else:
+ hostname = 'N/A'
+
+ for path, counter_info in counters.items():
+ if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
+ continue
+
+ value = counter_info['value']
+
+ yield {
+ "measurement": "ceph_daemon_stats",
+ "tags": {
+ "ceph_daemon": daemon,
+ "type_instance": path,
+ "host": hostname,
+ "fsid": self.get_fsid()
+ },
+ "time": now,
+ "fields": {
+ "value": value
+ }
+ }
+
+ def init_module_config(self) -> None:
+ self.config['hostname'] = \
+ self.get_module_option("hostname", default=self.config_keys['hostname'])
+ self.config['port'] = \
+ cast(int, self.get_module_option("port", default=self.config_keys['port']))
+ self.config['database'] = \
+ self.get_module_option("database", default=self.config_keys['database'])
+ self.config['username'] = \
+ self.get_module_option("username", default=self.config_keys['username'])
+ self.config['password'] = \
+ self.get_module_option("password", default=self.config_keys['password'])
+ self.config['interval'] = \
+ cast(int, self.get_module_option("interval",
+ default=self.config_keys['interval']))
+ self.config['threads'] = \
+ cast(int, self.get_module_option("threads",
+ default=self.config_keys['threads']))
+ self.config['batch_size'] = \
+ cast(int, self.get_module_option("batch_size",
+ default=self.config_keys['batch_size']))
+ ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl']))
+ self.config['ssl'] = ssl.lower() == 'true'
+ verify_ssl = \
+ cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']))
+ self.config['verify_ssl'] = verify_ssl.lower() == 'true'
+
+ def gather_statistics(self) -> Iterator[Dict[str, str]]:
+ now = self.get_timestamp()
+ df_stats, pools = self.get_df_stats(now)
+ return chain(df_stats, self.get_daemon_stats(now),
+ self.get_pg_summary_osd(pools, now),
+ self.get_pg_summary_pool(pools, now))
+
+ @contextmanager
+ def get_influx_client(self) -> Iterator['InfluxDBClient']:
+ client = InfluxDBClient(self.config['hostname'],
+ self.config['port'],
+ self.config['username'],
+ self.config['password'],
+ self.config['database'],
+ self.config['ssl'],
+ self.config['verify_ssl'])
+ try:
+ yield client
+ finally:
+ try:
+ client.close()
+ except AttributeError:
+ # influxdb older than v5.0.0
+ pass
+
+ def send_to_influx(self) -> bool:
+ if not self.config['hostname']:
+ self.log.error("No Influx server configured, please set one using: "
+ "ceph influx config-set hostname <hostname>")
+
+ self.set_health_checks({
+ 'MGR_INFLUX_NO_SERVER': {
+ 'severity': 'warning',
+ 'summary': 'No InfluxDB server configured',
+ 'detail': ['Configuration option hostname not set']
+ }
+ })
+ return False
+
+ self.health_checks = dict()
+
+ self.log.debug("Sending data to Influx host: %s",
+ self.config['hostname'])
+ try:
+ with self.get_influx_client() as client:
+ databases = client.get_list_database()
+ if {'name': self.config['database']} not in databases:
+ self.log.info("Database '%s' not found, trying to create "
+ "(requires admin privs). You can also create "
+ "manually and grant write privs to user "
+ "'%s'", self.config['database'],
+ self.config['database'])
+ client.create_database(self.config['database'])
+ client.create_retention_policy(name='8_weeks',
+ duration='8w',
+ replication='1',
+ default=True,
+ database=self.config['database'])
+
+ self.log.debug('Gathering statistics')
+ points = self.gather_statistics()
+ for chunk in self.chunk(points, cast(int, self.config['batch_size'])):
+ self.queue.put(chunk, block=False)
+
+ self.log.debug('Queue currently contains %d items',
+ self.queue.qsize())
+ return True
+ except queue.Full:
+ self.health_checks.update({
+ 'MGR_INFLUX_QUEUE_FULL': {
+ 'severity': 'warning',
+ 'summary': 'Failed to chunk to InfluxDB Queue',
+ 'detail': ['Queue is full. InfluxDB might be slow with '
+ 'processing data']
+ }
+ })
+ self.log.error('Queue is full, failed to add chunk')
+ return False
+ except (RequestException, InfluxDBClientError) as e:
+ self.health_checks.update({
+ 'MGR_INFLUX_DB_LIST_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to list/create InfluxDB database',
+ 'detail': [str(e)]
+ }
+ })
+ self.log.exception('Failed to list/create InfluxDB database')
+ return False
+ finally:
+ self.set_health_checks(self.health_checks)
+
+ def shutdown(self) -> None:
+ self.log.info('Stopping influx module')
+ self.run = False
+ self.event.set()
+ self.log.debug('Shutting down queue workers')
+
+ for _ in self.workers:
+ self.queue.put([])
+
+ self.queue.join()
+
+ for worker in self.workers:
+ worker.join()
+
+ def self_test(self) -> Optional[str]:
+ now = self.get_timestamp()
+ daemon_stats = list(self.get_daemon_stats(now))
+ assert len(daemon_stats)
+ df_stats, pools = self.get_df_stats(now)
+
+ result = {
+ 'daemon_stats': daemon_stats,
+ 'df_stats': df_stats
+ }
+
+ return json.dumps(result, indent=2, sort_keys=True)
+
+ @CLIReadCommand('influx config-show')
+ def config_show(self) -> Tuple[int, str, str]:
+ """
+ Show current configuration
+ """
+ return 0, json.dumps(self.config, sort_keys=True), ''
+
+ @CLIWriteCommand('influx config-set')
+ def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
+ if not value:
+ return -errno.EINVAL, '', 'Value should not be empty'
+
+ self.log.debug('Setting configuration option %s to %s', key, value)
+ try:
+ self.set_module_option(key, value)
+ self.config[key] = self.get_module_option(key)
+ return 0, 'Configuration option {0} updated'.format(key), ''
+ except ValueError as e:
+ return -errno.EINVAL, '', str(e)
+
+ @CLICommand('influx send')
+ def send(self) -> Tuple[int, str, str]:
+ """
+ Force sending data to Influx
+ """
+ self.send_to_influx()
+ return 0, 'Sending data to Influx', ''
+
+ def serve(self) -> None:
+ if InfluxDBClient is None:
+ self.log.error("Cannot transmit statistics: influxdb python "
+ "module not found. Did you install it?")
+ return
+
+ self.log.info('Starting influx module')
+ self.init_module_config()
+ self.run = True
+
+ self.log.debug('Starting %d queue worker threads',
+ self.config['threads'])
+ for i in range(cast(int, self.config['threads'])):
+ worker = Thread(target=self.queue_worker, args=())
+ worker.setDaemon(True)
+ worker.start()
+ self.workers.append(worker)
+
+ while self.run:
+ start = time.time()
+ self.send_to_influx()
+ runtime = time.time() - start
+ self.log.debug('Finished sending data to Influx in %.3f seconds',
+ runtime)
+ self.log.debug("Sleeping for %d seconds", self.config['interval'])
+ self.event.wait(cast(float, self.config['interval']))