diff options
author | Federico Ceratto <federico.ceratto@gmail.com> | 2017-04-30 16:09:37 +0000 |
---|---|---|
committer | Federico Ceratto <federico.ceratto@gmail.com> | 2017-04-30 16:09:37 +0000 |
commit | 51f689a8e17ff3929acd2dbf39e936d2cd3ac723 (patch) | |
tree | 92e54f543171b69dcbc639be09d11221cf96ba28 /python.d/postgres.chart.py | |
parent | New upstream version 1.5.0+dfsg (diff) | |
download | netdata-51f689a8e17ff3929acd2dbf39e936d2cd3ac723.tar.xz netdata-51f689a8e17ff3929acd2dbf39e936d2cd3ac723.zip |
New upstream version 1.6.0+dfsgupstream/1.6.0+dfsg
Diffstat (limited to 'python.d/postgres.chart.py')
-rw-r--r-- | python.d/postgres.chart.py | 526 |
1 files changed, 273 insertions, 253 deletions
diff --git a/python.d/postgres.chart.py b/python.d/postgres.chart.py index 919b6f8e..d359bb4f 100644 --- a/python.d/postgres.chart.py +++ b/python.d/postgres.chart.py @@ -2,12 +2,16 @@ # Description: example netdata python.d module # Authors: facetoe, dangtranhoang -import re from copy import deepcopy -import psycopg2 -from psycopg2 import extensions -from psycopg2.extras import DictCursor +try: + import psycopg2 + from psycopg2 import extensions + from psycopg2.extras import DictCursor + from psycopg2 import OperationalError + PSYCOPG2 = True +except ImportError: + PSYCOPG2 = False from base import SimpleService @@ -16,39 +20,70 @@ update_every = 1 priority = 90000 retries = 60 -ARCHIVE = """ +METRICS = dict( + DATABASE=['connections', + 'xact_commit', + 'xact_rollback', + 'blks_read', + 'blks_hit', + 'tup_returned', + 'tup_fetched', + 'tup_inserted', + 'tup_updated', + 'tup_deleted', + 'conflicts', + 'size'], + BACKENDS=['backends_active', + 'backends_idle'], + INDEX_STATS=['index_count', + 'index_size'], + TABLE_STATS=['table_size', + 'table_count'], + ARCHIVE=['ready_count', + 'done_count', + 'file_count'], + BGWRITER=['writer_scheduled', + 'writer_requested'], + LOCKS=['ExclusiveLock', + 'RowShareLock', + 'SIReadLock', + 'ShareUpdateExclusiveLock', + 'AccessExclusiveLock', + 'AccessShareLock', + 'ShareRowExclusiveLock', + 'ShareLock', + 'RowExclusiveLock'] +) + +QUERIES = dict( + ARCHIVE=""" SELECT CAST(COUNT(*) AS INT) AS file_count, CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count, CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count FROM pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file); -""" - -BACKENDS = """ +""", + BACKENDS=""" SELECT count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active, (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle -FROM - pg_stat_activity; -""" - -TABLE_STATS = """ +FROM pg_stat_activity; +""", + TABLE_STATS=""" SELECT - ((sum(relpages) * 8) * 1024) AS size_relations, - count(1) AS relations + ((sum(relpages) * 8) * 1024) AS table_size, + count(1) AS table_count FROM pg_class WHERE relkind IN ('r', 't'); -""" - -INDEX_STATS = """ +""", + INDEX_STATS=""" SELECT - ((sum(relpages) * 8) * 1024) AS size_indexes, - count(1) AS indexes + ((sum(relpages) * 8) * 1024) AS index_size, + count(1) AS index_count FROM pg_class -WHERE relkind = 'i';""" - -DATABASE = """ +WHERE relkind = 'i';""", + DATABASE=""" SELECT datname AS database_name, sum(numbackends) AS connections, @@ -61,93 +96,108 @@ SELECT sum(tup_inserted) AS tup_inserted, sum(tup_updated) AS tup_updated, sum(tup_deleted) AS tup_deleted, - sum(conflicts) AS conflicts + sum(conflicts) AS conflicts, + pg_database_size(datname) AS size FROM pg_stat_database WHERE NOT datname ~* '^template\d+' GROUP BY database_name; -""" - -BGWRITER = 'SELECT * FROM pg_stat_bgwriter;' -DATABASE_LOCKS = """ +""", + BGWRITER=""" +SELECT + checkpoints_timed AS writer_scheduled, + checkpoints_req AS writer_requested +FROM pg_stat_bgwriter;""", + LOCKS=""" SELECT pg_database.datname as database_name, mode, - count(mode) AS count + count(mode) AS locks_count FROM pg_locks INNER JOIN pg_database ON pg_database.oid = pg_locks.database GROUP BY datname, mode ORDER BY datname, mode; -""" -REPLICATION = """ -SELECT - client_hostname, - client_addr, - state, - sent_offset - ( - replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag -FROM ( - SELECT - client_addr, client_hostname, state, - ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog, - ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog, - ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset, - ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset - FROM pg_stat_replication -) AS s; -""" - -LOCK_TYPES = [ - 'ExclusiveLock', - 'RowShareLock', - 'SIReadLock', - 'ShareUpdateExclusiveLock', - 'AccessExclusiveLock', - 'AccessShareLock', - 'ShareRowExclusiveLock', - 'ShareLock', - 'RowExclusiveLock' -] - -ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', +""", + FIND_DATABASES=""" +SELECT datname FROM pg_stat_database WHERE NOT datname ~* '^template\d+' +""", + IF_SUPERUSER=""" +SELECT current_setting('is_superuser') = 'on' AS is_superuser; + """) + +# REPLICATION = """ +# SELECT +# client_hostname, +# client_addr, +# state, +# sent_offset - ( +# replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag +# FROM ( +# SELECT +# client_addr, client_hostname, state, +# ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog, +# ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog, +# ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset, +# ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset +# FROM pg_stat_replication +# ) AS s; +# """ + + +QUERY_STATS = { + QUERIES['DATABASE']: METRICS['DATABASE'], + QUERIES['BACKENDS']: METRICS['BACKENDS'], + QUERIES['ARCHIVE']: METRICS['ARCHIVE'], + QUERIES['LOCKS']: METRICS['LOCKS'] +} + +ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size', 'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer'] CHARTS = { 'db_stat_transactions': { - 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', 'line'], + 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', + 'line'], 'lines': [ - ['db_stat_xact_commit', 'committed', 'incremental'], - ['db_stat_xact_rollback', 'rolled back', 'incremental'] + ['xact_commit', 'committed', 'incremental'], + ['xact_rollback', 'rolled back', 'incremental'] ]}, 'db_stat_connections': { - 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', 'line'], + 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', + 'line'], 'lines': [ - ['db_stat_connections', 'connections', 'absolute'] + ['connections', 'connections', 'absolute'] ]}, 'db_stat_tuple_read': { 'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'], 'lines': [ - ['db_stat_blks_read', 'disk', 'incremental'], - ['db_stat_blks_hit', 'cache', 'incremental'] + ['blks_read', 'disk', 'incremental'], + ['blks_hit', 'cache', 'incremental'] ]}, 'db_stat_tuple_returned': { - 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', 'line'], + 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', + 'line'], 'lines': [ - ['db_stat_tup_returned', 'sequential', 'incremental'], - ['db_stat_tup_fetched', 'bitmap', 'incremental'] + ['tup_returned', 'sequential', 'incremental'], + ['tup_fetched', 'bitmap', 'incremental'] ]}, 'db_stat_tuple_write': { 'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'], 'lines': [ - ['db_stat_tup_inserted', 'inserted', 'incremental'], - ['db_stat_tup_updated', 'updated', 'incremental'], - ['db_stat_tup_deleted', 'deleted', 'incremental'], - ['db_stat_conflicts', 'conflicts', 'incremental'] + ['tup_inserted', 'inserted', 'incremental'], + ['tup_updated', 'updated', 'incremental'], + ['tup_deleted', 'deleted', 'incremental'], + ['conflicts', 'conflicts', 'incremental'] + ]}, + 'database_size': { + 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'], + 'lines': [ ]}, 'backend_process': { - 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', 'line'], + 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', + 'line'], 'lines': [ - ['backend_process_active', 'active', 'absolute'], - ['backend_process_idle', 'idle', 'absolute'] + ['backends_active', 'active', 'absolute'], + ['backends_idle', 'idle', 'absolute'] ]}, 'index_count': { 'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'], @@ -172,15 +222,15 @@ CHARTS = { 'wal': { 'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'], 'lines': [ - ['wal_total', 'total', 'incremental'], - ['wal_ready', 'ready', 'incremental'], - ['wal_done', 'done', 'incremental'] + ['file_count', 'total', 'incremental'], + ['ready_count', 'ready', 'incremental'], + ['done_count', 'done', 'incremental'] ]}, 'background_writer': { 'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'], 'lines': [ - ['background_writer_scheduled', 'scheduled', 'incremental'], - ['background_writer_requested', 'requested', 'incremental'] + ['writer_scheduled', 'scheduled', 'incremental'], + ['writer_requested', 'requested', 'incremental'] ]} } @@ -188,199 +238,169 @@ CHARTS = { class Service(SimpleService): def __init__(self, configuration=None, name=None): super(self.__class__, self).__init__(configuration=configuration, name=name) - self.order = ORDER - self.definitions = CHARTS - self.table_stats = configuration.pop('table_stats', True) - self.index_stats = configuration.pop('index_stats', True) + self.order = ORDER[:] + self.definitions = deepcopy(CHARTS) + self.table_stats = configuration.pop('table_stats', False) + self.index_stats = configuration.pop('index_stats', False) self.configuration = configuration - self.connection = None + self.connection = False self.is_superuser = False - self.data = {} - self.databases = set() + self.data = dict() + self.locks_zeroed = dict() + self.databases = list() def _connect(self): params = dict(user='postgres', database=None, password=None, - host='localhost', + host=None, port=5432) params.update(self.configuration) if not self.connection: - self.connection = psycopg2.connect(**params) - self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) - self.connection.set_session(readonly=True) + try: + self.connection = psycopg2.connect(**params) + self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + self.connection.set_session(readonly=True) + except OperationalError as error: + return False, str(error) + return True, True def check(self): + if not PSYCOPG2: + self.error('\'python-psycopg2\' module is needed to use postgres.chart.py') + return False + result, error = self._connect() + if not result: + conf = dict([(k, (lambda k, v: v if k != 'password' else '*****')(k, v)) for k, v in self.configuration.items()]) + self.error('Failed to connect to %s. Error: %s' % (str(conf), error)) + return False try: - self._connect() cursor = self.connection.cursor() - self._discover_databases(cursor) - self._check_if_superuser(cursor) + self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES']) + is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER']) cursor.close() - self._create_definitions() + self.locks_zeroed = populate_lock_types(self.databases) + self.add_additional_queries_(is_superuser) + self.create_dynamic_charts_() return True - except Exception as e: - self.error(str(e)) + except Exception as error: + self.error(str(error)) return False - def _discover_databases(self, cursor): - cursor.execute(""" - SELECT datname - FROM pg_stat_database - WHERE NOT datname ~* '^template\d+' - """) - self.databases = set(r[0] for r in cursor) - - def _check_if_superuser(self, cursor): - cursor.execute(""" - SELECT current_setting('is_superuser') = 'on' AS is_superuser; - """) - self.is_superuser = cursor.fetchone()[0] - - def _create_definitions(self): - for database_name in self.databases: - for chart_template_name in list(CHARTS): - if chart_template_name.startswith('db_stat'): - self._add_database_stat_chart(chart_template_name, database_name) - self._add_database_lock_chart(database_name) - - def _add_database_stat_chart(self, chart_template_name, database_name): - chart_template = CHARTS[chart_template_name] - chart_name = "{0}_{1}".format(database_name, chart_template_name) - if chart_name not in self.order: - self.order.insert(0, chart_name) - name, title, units, family, context, chart_type = chart_template['options'] - self.definitions[chart_name] = { - 'options': [ - name, - title + ': ' + database_name, - units, - 'db ' + database_name, - context, - chart_type - ] - } - - self.definitions[chart_name]['lines'] = [] - for line in deepcopy(chart_template['lines']): - line[0] = "{0}_{1}".format(database_name, line[0]) - self.definitions[chart_name]['lines'].append(line) - - def _add_database_lock_chart(self, database_name): - chart_name = "{0}_locks".format(database_name) - if chart_name not in self.order: - self.order.insert(-1, chart_name) - self.definitions[chart_name] = dict( - options= - [ - None, - 'Locks on db: ' + database_name, - 'locks', - 'db ' + database_name, - 'postgres.db_locks', - 'line' - ], - lines=[] - ) - - for lock_type in LOCK_TYPES: - lock_id = "{0}_{1}".format(database_name, lock_type) - label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type) - self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute']) - - def _get_data(self): - self._connect() - - cursor = self.connection.cursor(cursor_factory=DictCursor) - self.add_stats(cursor) - - cursor.close() - return self.data - - def add_stats(self, cursor): - self.add_database_stats(cursor) - self.add_backend_stats(cursor) + def add_additional_queries_(self, is_superuser): if self.index_stats: - self.add_index_stats(cursor) + QUERY_STATS[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS'] if self.table_stats: - self.add_table_stats(cursor) - self.add_lock_stats(cursor) - self.add_bgwriter_stats(cursor) + QUERY_STATS[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS'] + if is_superuser: + QUERY_STATS[QUERIES['BGWRITER']] = METRICS['BGWRITER'] - # self.add_replication_stats(cursor) + def create_dynamic_charts_(self): - if self.is_superuser: - self.add_wal_stats(cursor) + for database_name in self.databases[::-1]: + self.definitions['database_size']['lines'].append([database_name + '_size', + database_name, 'absolute', 1, 1024 * 1024]) + for chart_name in [name for name in CHARTS if name.startswith('db_stat')]: + add_database_stat_chart_(order=self.order, definitions=self.definitions, + name=chart_name, database_name=database_name) - def add_database_stats(self, cursor): - cursor.execute(DATABASE) - for row in cursor: - database_name = row.get('database_name') - self.data["{0}_{1}".format(database_name, 'db_stat_xact_commit')] = int(row.get('xact_commit', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_xact_rollback')] = int(row.get('xact_rollback', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_blks_read')] = int(row.get('blks_read', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_blks_hit')] = int(row.get('blks_hit', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_tup_returned')] = int(row.get('tup_returned', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_tup_fetched')] = int(row.get('tup_fetched', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_tup_inserted')] = int(row.get('tup_inserted', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_tup_updated')] = int(row.get('tup_updated', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_tup_deleted')] = int(row.get('tup_deleted', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_conflicts')] = int(row.get('conflicts', 0)) - self.data["{0}_{1}".format(database_name, 'db_stat_connections')] = int(row.get('connections', 0)) - - def add_backend_stats(self, cursor): - cursor.execute(BACKENDS) - temp = cursor.fetchone() - - self.data['backend_process_active'] = int(temp.get('backends_active', 0)) - self.data['backend_process_idle'] = int(temp.get('backends_idle', 0)) - - def add_index_stats(self, cursor): - cursor.execute(INDEX_STATS) - temp = cursor.fetchone() - self.data['index_count'] = int(temp.get('indexes', 0)) - self.data['index_size'] = int(temp.get('size_indexes', 0)) - - def add_table_stats(self, cursor): - cursor.execute(TABLE_STATS) - temp = cursor.fetchone() - self.data['table_count'] = int(temp.get('relations', 0)) - self.data['table_size'] = int(temp.get('size_relations', 0)) - - def add_lock_stats(self, cursor): - cursor.execute(DATABASE_LOCKS) - - # zero out all current lock values - for database_name in self.databases: - for lock_type in LOCK_TYPES: - self.data["{0}_{1}".format(database_name, lock_type)] = 0 - - # populate those that have current locks + add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name) + + def _get_data(self): + result, error = self._connect() + if result: + cursor = self.connection.cursor(cursor_factory=DictCursor) + try: + self.data.update(self.locks_zeroed) + for query, metrics in QUERY_STATS.items(): + self.query_stats_(cursor, query, metrics) + + except OperationalError: + self.connection = False + cursor.close() + return None + else: + cursor.close() + return self.data + else: + return None + + def query_stats_(self, cursor, query, metrics): + cursor.execute(query) for row in cursor: - database_name, lock_type, lock_count = row - self.data["{0}_{1}".format(database_name, lock_type)] = lock_count - - def add_wal_stats(self, cursor): - cursor.execute(ARCHIVE) - temp = cursor.fetchone() - self.data['wal_total'] = int(temp.get('file_count', 0)) - self.data['wal_ready'] = int(temp.get('ready_count', 0)) - self.data['wal_done'] = int(temp.get('done_count', 0)) - - def add_bgwriter_stats(self, cursor): - cursor.execute(BGWRITER) - temp = cursor.fetchone() - self.data['background_writer_scheduled'] = temp.get('checkpoints_timed', 0) - self.data['background_writer_requested'] = temp.get('checkpoints_requests', 0) - -''' - def add_replication_stats(self, cursor): - cursor.execute(REPLICATION) - temp = cursor.fetchall() - for row in temp: - self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'), - 'byte_lag', - int(row.get('byte_lag', 0))) -''' + for metric in metrics: + dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric + if metric in row: + self.data[dimension_id] = int(row[metric]) + elif 'locks_count' in row: + self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0 + + +def discover_databases_(cursor, query): + cursor.execute(query) + result = list() + for db in [database[0] for database in cursor]: + if db not in result: + result.append(db) + return result + + +def check_if_superuser_(cursor, query): + cursor.execute(query) + return cursor.fetchone()[0] + + +def populate_lock_types(databases): + result = dict() + for database in databases: + for lock_type in METRICS['LOCKS']: + key = '_'.join([database, lock_type]) + result[key] = 0 + + return result + + +def add_database_lock_chart_(order, definitions, database_name): + def create_lines(database): + result = list() + for lock_type in METRICS['LOCKS']: + dimension_id = '_'.join([database, lock_type]) + result.append([dimension_id, lock_type, 'absolute']) + return result + + chart_name = database_name + '_locks' + order.insert(-1, chart_name) + definitions[chart_name] = { + 'options': + [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'], + 'lines': create_lines(database_name) + } + + +def add_database_stat_chart_(order, definitions, name, database_name): + def create_lines(database, lines): + result = list() + for line in lines: + new_line = ['_'.join([database, line[0]])] + line[1:] + result.append(new_line) + return result + + chart_template = CHARTS[name] + chart_name = '_'.join([database_name, name]) + order.insert(0, chart_name) + name, title, units, family, context, chart_type = chart_template['options'] + definitions[chart_name] = { + 'options': [name, title + ': ' + database_name, units, 'db ' + database_name, context, chart_type], + 'lines': create_lines(database_name, chart_template['lines'])} + + +# +# def add_replication_stats(self, cursor): +# cursor.execute(REPLICATION) +# temp = cursor.fetchall() +# for row in temp: +# self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'), +# 'byte_lag', +# int(row.get('byte_lag', 0))) |