From 8a7b72f7cd1ccd547a03eb4243294e741d661d3f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 8 Feb 2019 08:30:37 +0100 Subject: Adding upstream version 1.12.0. Signed-off-by: Daniel Baumann --- collectors/python.d.plugin/postgres/README.md | 2 + .../python.d.plugin/postgres/postgres.chart.py | 699 ++++++++++++++------- collectors/python.d.plugin/postgres/postgres.conf | 22 +- 3 files changed, 497 insertions(+), 226 deletions(-) (limited to 'collectors/python.d.plugin/postgres') diff --git a/collectors/python.d.plugin/postgres/README.md b/collectors/python.d.plugin/postgres/README.md index e7b108d36..9939a0c48 100644 --- a/collectors/python.d.plugin/postgres/README.md +++ b/collectors/python.d.plugin/postgres/README.md @@ -66,3 +66,5 @@ tcp: When no configuration file is found, module tries to connect to TCP/IP socket: `localhost:5432`. --- + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcollectors%2Fpython.d.plugin%2Fpostgres%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]() diff --git a/collectors/python.d.plugin/postgres/postgres.chart.py b/collectors/python.d.plugin/postgres/postgres.chart.py index 7f43877c3..e988eec36 100644 --- a/collectors/python.d.plugin/postgres/postgres.chart.py +++ b/collectors/python.d.plugin/postgres/postgres.chart.py @@ -16,13 +16,34 @@ except ImportError: from bases.FrameworkServices.SimpleService import SimpleService -# default module values -update_every = 1 -priority = 60000 -retries = 60 + +DEFAULT_PORT = 5432 +DEFAULT_USER = 'postgres' +DEFAULT_CONNECT_TIMEOUT = 2 # seconds +DEFAULT_STATEMENT_TIMEOUT = 5000 # ms + + +WAL = 'WAL' +ARCHIVE = 'ARCHIVE' +BACKENDS = 'BACKENDS' +TABLE_STATS = 'TABLE_STATS' +INDEX_STATS = 'INDEX_STATS' +DATABASE = 'DATABASE' +BGWRITER = 'BGWRITER' +LOCKS = 'LOCKS' +DATABASES = 'DATABASES' +STANDBY = 'STANDBY' +REPLICATION_SLOT = 'REPLICATION_SLOT' +STANDBY_DELTA = 'STANDBY_DELTA' +REPSLOT_FILES = 'REPSLOT_FILES' +IF_SUPERUSER = 'IF_SUPERUSER' +SERVER_VERSION = 'SERVER_VERSION' +AUTOVACUUM = 'AUTOVACUUM' +DIFF_LSN = 'DIFF_LSN' +WAL_WRITES = 'WAL_WRITES' METRICS = { - 'DATABASE': [ + DATABASE: [ 'connections', 'xact_commit', 'xact_rollback', @@ -38,32 +59,32 @@ METRICS = { 'temp_bytes', 'size' ], - 'BACKENDS': [ + BACKENDS: [ 'backends_active', 'backends_idle' ], - 'INDEX_STATS': [ + INDEX_STATS: [ 'index_count', 'index_size' ], - 'TABLE_STATS': [ + TABLE_STATS: [ 'table_size', 'table_count' ], - 'WAL': [ + WAL: [ 'written_wal', 'recycled_wal', 'total_wal' ], - 'WAL_WRITES': [ + WAL_WRITES: [ 'wal_writes' ], - 'ARCHIVE': [ + ARCHIVE: [ 'ready_count', 'done_count', 'file_count' ], - 'BGWRITER': [ + BGWRITER: [ 'checkpoint_scheduled', 'checkpoint_requested', 'buffers_checkpoint', @@ -73,7 +94,7 @@ METRICS = { 'buffers_alloc', 'buffers_backend_fsync' ], - 'LOCKS': [ + LOCKS: [ 'ExclusiveLock', 'RowShareLock', 'SIReadLock', @@ -84,27 +105,61 @@ METRICS = { 'ShareLock', 'RowExclusiveLock' ], - 'AUTOVACUUM': [ + AUTOVACUUM: [ 'analyze', 'vacuum_analyze', 'vacuum', 'vacuum_freeze', 'brin_summarize' ], - 'STANDBY_DELTA': [ + STANDBY_DELTA: [ 'sent_delta', 'write_delta', 'flush_delta', 'replay_delta' ], - 'REPSLOT_FILES': [ + REPSLOT_FILES: [ 'replslot_wal_keep', 'replslot_files' ] } -QUERIES = { - 'WAL': """ +NO_VERSION = 0 +DEFAULT = 'DEFAULT' +V96 = 'V96' +V10 = 'V10' +V11 = 'V11' + + +QUERY_WAL = { + DEFAULT: """ +SELECT + count(*) as total_wal, + count(*) FILTER (WHERE type = 'recycled') AS recycled_wal, + count(*) FILTER (WHERE type = 'written') AS written_wal +FROM + (SELECT + wal.name, + pg_walfile_name( + CASE pg_is_in_recovery() + WHEN true THEN NULL + ELSE pg_current_wal_lsn() + END ), + CASE + WHEN wal.name > pg_walfile_name( + CASE pg_is_in_recovery() + WHEN true THEN NULL + ELSE pg_current_wal_lsn() + END ) THEN 'recycled' + ELSE 'written' + END AS type + FROM pg_catalog.pg_ls_dir('pg_wal') AS wal(name) + WHERE name ~ '^[0-9A-F]{24}$' + ORDER BY + (pg_stat_file('pg_wal/'||name)).modification, + wal.name DESC) sub; +""", + V96: """ SELECT count(*) as total_wal, count(*) FILTER (WHERE type = 'recycled') AS recycled_wal, @@ -112,34 +167,49 @@ SELECT FROM (SELECT wal.name, - pg_{0}file_name( + pg_xlogfile_name( CASE pg_is_in_recovery() WHEN true THEN NULL - ELSE pg_current_{0}_{1}() + ELSE pg_current_xlog_location() END ), CASE - WHEN wal.name > pg_{0}file_name( + WHEN wal.name > pg_xlogfile_name( CASE pg_is_in_recovery() WHEN true THEN NULL - ELSE pg_current_{0}_{1}() + ELSE pg_current_xlog_location() END ) THEN 'recycled' ELSE 'written' END AS type - FROM pg_catalog.pg_ls_dir('pg_{0}') AS wal(name) - WHERE name ~ '^[0-9A-F]{{24}}$' + FROM pg_catalog.pg_ls_dir('pg_xlog') AS wal(name) + WHERE name ~ '^[0-9A-F]{24}$' ORDER BY - (pg_stat_file('pg_{0}/'||name)).modification, + (pg_stat_file('pg_xlog/'||name)).modification, wal.name DESC) sub; """, - 'ARCHIVE': """ +} + +QUERY_ARCHIVE = { + DEFAULT: """ +SELECT + CAST(COUNT(*) AS INT) AS file_count, + CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count, + CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count +FROM + pg_catalog.pg_ls_dir('pg_wal/archive_status') AS archive_files (archive_file); +""", + V96: """ SELECT CAST(COUNT(*) AS INT) AS file_count, CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count, CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count FROM - pg_catalog.pg_ls_dir('pg_{0}/archive_status') AS archive_files (archive_file); + pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file); + """, - 'BACKENDS': """ +} + +QUERY_BACKEND = { + DEFAULT: """ SELECT count(*) - (SELECT count(*) FROM pg_stat_activity @@ -151,21 +221,30 @@ SELECT AS backends_idle FROM pg_stat_activity; """, - 'TABLE_STATS': """ +} + +QUERY_TABLE_STATS = { + DEFAULT: """ SELECT ((sum(relpages) * 8) * 1024) AS table_size, count(1) AS table_count FROM pg_class WHERE relkind IN ('r', 't'); """, - 'INDEX_STATS': """ +} + +QUERY_INDEX_STATS = { + DEFAULT: """ SELECT ((sum(relpages) * 8) * 1024) AS index_size, count(1) AS index_count FROM pg_class WHERE relkind = 'i'; """, - 'DATABASE': """ +} + +QUERY_DATABASE = { + DEFAULT: """ SELECT datname AS database_name, numbackends AS connections, @@ -185,7 +264,10 @@ SELECT FROM pg_stat_database WHERE datname IN %(databases)s ; """, - 'BGWRITER': """ +} + +QUERY_BGWRITER = { + DEFAULT: """ SELECT checkpoints_timed AS checkpoint_scheduled, checkpoints_req AS checkpoint_requested, @@ -197,7 +279,10 @@ SELECT buffers_backend_fsync FROM pg_stat_bgwriter; """, - 'LOCKS': """ +} + +QUERY_LOCKS = { + DEFAULT: """ SELECT pg_database.datname as database_name, mode, @@ -208,7 +293,10 @@ INNER JOIN pg_database GROUP BY datname, mode ORDER BY datname, mode; """, - 'FIND_DATABASES': """ +} + +QUERY_DATABASES = { + DEFAULT: """ SELECT datname FROM pg_stat_database @@ -217,48 +305,129 @@ WHERE (SELECT current_user), datname, 'connect') AND NOT datname ~* '^template\d '; """, - 'FIND_STANDBY': """ +} + +QUERY_STANDBY = { + DEFAULT: """ SELECT application_name FROM pg_stat_replication WHERE application_name IS NOT NULL GROUP BY application_name; """, - 'FIND_REPLICATION_SLOT': """ +} + +QUERY_REPLICATION_SLOT = { + DEFAULT: """ SELECT slot_name FROM pg_replication_slots; +""" +} + +QUERY_STANDBY_DELTA = { + DEFAULT: """ +SELECT + application_name, + pg_wal_lsn_diff( + CASE pg_is_in_recovery() + WHEN true THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() + END, + sent_lsn) AS sent_delta, + pg_wal_lsn_diff( + CASE pg_is_in_recovery() + WHEN true THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() + END, + write_lsn) AS write_delta, + pg_wal_lsn_diff( + CASE pg_is_in_recovery() + WHEN true THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() + END, + flush_lsn) AS flush_delta, + pg_wal_lsn_diff( + CASE pg_is_in_recovery() + WHEN true THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() + END, + replay_lsn) AS replay_delta +FROM pg_stat_replication +WHERE application_name IS NOT NULL; """, - 'STANDBY_DELTA': """ + V96: """ SELECT application_name, - pg_{0}_{1}_diff( + pg_xlog_location_diff( CASE pg_is_in_recovery() - WHEN true THEN pg_last_{0}_receive_{1}() - ELSE pg_current_{0}_{1}() + WHEN true THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() END, - sent_{1}) AS sent_delta, - pg_{0}_{1}_diff( + sent_location) AS sent_delta, + pg_xlog_location_diff( CASE pg_is_in_recovery() - WHEN true THEN pg_last_{0}_receive_{1}() - ELSE pg_current_{0}_{1}() + WHEN true THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() END, - write_{1}) AS write_delta, - pg_{0}_{1}_diff( + write_location) AS write_delta, + pg_xlog_location_diff( CASE pg_is_in_recovery() - WHEN true THEN pg_last_{0}_receive_{1}() - ELSE pg_current_{0}_{1}() + WHEN true THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() END, - flush_{1}) AS flush_delta, - pg_{0}_{1}_diff( + flush_location) AS flush_delta, + pg_xlog_location_diff( CASE pg_is_in_recovery() - WHEN true THEN pg_last_{0}_receive_{1}() - ELSE pg_current_{0}_{1}() + WHEN true THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() END, - replay_{1}) AS replay_delta + replay_location) AS replay_delta FROM pg_stat_replication WHERE application_name IS NOT NULL; """, - 'REPSLOT_FILES': """ +} + +QUERY_REPSLOT_FILES = { + DEFAULT: """ +WITH wal_size AS ( + SELECT + setting::int AS val + FROM pg_settings + WHERE name = 'wal_segment_size' + ) +SELECT + slot_name, + slot_type, + replslot_wal_keep, + count(slot_file) AS replslot_files +FROM + (SELECT + slot.slot_name, + CASE + WHEN slot_file <> 'state' THEN 1 + END AS slot_file , + slot_type, + COALESCE ( + floor( + (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn) + - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val) + ),0) AS replslot_wal_keep + FROM pg_replication_slots slot + LEFT JOIN ( + SELECT + slot2.slot_name, + pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file + FROM pg_replication_slots slot2 + ) files (slot_name, slot_file) + ON slot.slot_name = files.slot_name + CROSS JOIN wal_size s + ) AS d +GROUP BY + slot_name, + slot_type, + replslot_wal_keep; +""", + V10: """ WITH wal_size AS ( SELECT current_setting('wal_block_size')::INT * setting::INT AS val @@ -297,13 +466,22 @@ GROUP BY slot_type, replslot_wal_keep; """, - 'IF_SUPERUSER': """ +} + +QUERY_SUPERUSER = { + DEFAULT: """ SELECT current_setting('is_superuser') = 'on' AS is_superuser; """, - 'DETECT_SERVER_VERSION': """ +} + +QUERY_SHOW_VERSION = { + DEFAULT: """ SHOW server_version_num; """, - 'AUTOVACUUM': """ +} + +QUERY_AUTOVACUUM = { + DEFAULT: """ SELECT count(*) FILTER (WHERE query LIKE 'autovacuum: ANALYZE%%') AS analyze, count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM ANALYZE%%') AS vacuum_analyze, @@ -315,23 +493,78 @@ SELECT FROM pg_stat_activity WHERE query NOT LIKE '%%pg_stat_activity%%'; """, - 'DIFF_LSN': """ +} + +QUERY_DIFF_LSN = { + DEFAULT: """ SELECT - pg_{0}_{1}_diff( + pg_wal_lsn_diff( CASE pg_is_in_recovery() - WHEN true THEN pg_last_{0}_receive_{1}() - ELSE pg_current_{0}_{1}() + WHEN true THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() END, '0/0') as wal_writes ; -""" +""", + V96: """ +SELECT + pg_xlog_location_diff( + CASE pg_is_in_recovery() + WHEN true THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() + END, + '0/0') as wal_writes ; +""", } -QUERY_STATS = { - QUERIES['DATABASE']: METRICS['DATABASE'], - QUERIES['BACKENDS']: METRICS['BACKENDS'], - QUERIES['LOCKS']: METRICS['LOCKS'] -} +def query_factory(name, version=NO_VERSION): + if name == BACKENDS: + return QUERY_BACKEND[DEFAULT] + elif name == TABLE_STATS: + return QUERY_TABLE_STATS[DEFAULT] + elif name == INDEX_STATS: + return QUERY_INDEX_STATS[DEFAULT] + elif name == DATABASE: + return QUERY_DATABASE[DEFAULT] + elif name == BGWRITER: + return QUERY_BGWRITER[DEFAULT] + elif name == LOCKS: + return QUERY_LOCKS[DEFAULT] + elif name == DATABASES: + return QUERY_DATABASES[DEFAULT] + elif name == STANDBY: + return QUERY_STANDBY[DEFAULT] + elif name == REPLICATION_SLOT: + return QUERY_REPLICATION_SLOT[DEFAULT] + elif name == IF_SUPERUSER: + return QUERY_SUPERUSER[DEFAULT] + elif name == SERVER_VERSION: + return QUERY_SHOW_VERSION[DEFAULT] + elif name == AUTOVACUUM: + return QUERY_AUTOVACUUM[DEFAULT] + elif name == WAL: + if version < 100000: + return QUERY_WAL[V96] + return QUERY_WAL[DEFAULT] + elif name == ARCHIVE: + if version < 100000: + return QUERY_ARCHIVE[V96] + return QUERY_ARCHIVE[DEFAULT] + elif name == STANDBY_DELTA: + if version < 100000: + return QUERY_STANDBY_DELTA[V96] + return QUERY_STANDBY_DELTA[DEFAULT] + elif name == REPSLOT_FILES: + if version < 110000: + return QUERY_REPSLOT_FILES[V10] + return QUERY_REPSLOT_FILES[DEFAULT] + elif name == DIFF_LSN: + if version < 100000: + return QUERY_DIFF_LSN[V96] + return QUERY_DIFF_LSN[DEFAULT] + + raise ValueError('unknown query') + ORDER = [ 'db_stat_temp_files', @@ -403,7 +636,7 @@ CHARTS = { ] }, 'db_stat_temp_bytes': { - 'options': [None, 'Temp files written to disk', 'KB/s', 'db statistics', 'postgres.db_stat_temp_bytes', + 'options': [None, 'Temp files written to disk', 'KiB/s', 'db statistics', 'postgres.db_stat_temp_bytes', 'line'], 'lines': [ ['temp_bytes', 'size', 'incremental', 1, 1024] @@ -417,7 +650,7 @@ CHARTS = { ] }, 'database_size': { - 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'], + 'options': [None, 'Database size', 'MiB', 'database size', 'postgres.db_size', 'stacked'], 'lines': [ ] }, @@ -436,7 +669,7 @@ CHARTS = { ] }, 'index_size': { - 'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'], + 'options': [None, 'Indexes size', 'MiB', 'indexes', 'postgres.index_size', 'line'], 'lines': [ ['index_size', 'size', 'absolute', 1, 1024 * 1024] ] @@ -448,7 +681,7 @@ CHARTS = { ] }, 'table_size': { - 'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'], + 'options': [None, 'Tables size', 'MiB', 'tables', 'postgres.table_size', 'line'], 'lines': [ ['table_size', 'size', 'absolute', 1, 1024 * 1024] ] @@ -462,7 +695,7 @@ CHARTS = { ] }, 'wal_writes': { - 'options': [None, 'Write-Ahead Logs', 'kilobytes/s', 'wal_writes', 'postgres.wal_writes', 'line'], + 'options': [None, 'Write-Ahead Logs', 'KiB/s', 'wal_writes', 'postgres.wal_writes', 'line'], 'lines': [ ['wal_writes', 'writes', 'incremental', 1, 1024] ] @@ -483,20 +716,20 @@ CHARTS = { ] }, 'stat_bgwriter_alloc': { - 'options': [None, 'Buffers allocated', 'kilobytes/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'], + 'options': [None, 'Buffers allocated', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'], 'lines': [ ['buffers_alloc', 'alloc', 'incremental', 1, 1024] ] }, 'stat_bgwriter_checkpoint': { - 'options': [None, 'Buffers written during checkpoints', 'kilobytes/s', 'bgwriter', + 'options': [None, 'Buffers written during checkpoints', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_checkpoint', 'line'], 'lines': [ ['buffers_checkpoint', 'checkpoint', 'incremental', 1, 1024] ] }, 'stat_bgwriter_backend': { - 'options': [None, 'Buffers written directly by a backend', 'kilobytes/s', 'bgwriter', + 'options': [None, 'Buffers written directly by a backend', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_backend', 'line'], 'lines': [ ['buffers_backend', 'backend', 'incremental', 1, 1024] @@ -509,7 +742,7 @@ CHARTS = { ] }, 'stat_bgwriter_bgwriter': { - 'options': [None, 'Buffers written by the background writer', 'kilobytes/s', 'bgwriter', + 'options': [None, 'Buffers written by the background writer', 'KiB/s', 'bgwriter', 'postgres.bgwriter_bgwriter', 'line'], 'lines': [ ['buffers_clean', 'clean', 'incremental', 1, 1024] @@ -533,7 +766,7 @@ CHARTS = { ] }, 'standby_delta': { - 'options': [None, 'Standby delta', 'kilobytes', 'replication delta', 'postgres.standby_delta', 'line'], + 'options': [None, 'Standby delta', 'KiB', 'replication delta', 'postgres.standby_delta', 'line'], 'lines': [ ['sent_delta', 'sent delta', 'absolute', 1, 1024], ['write_delta', 'write delta', 'absolute', 1, 1024], @@ -554,186 +787,218 @@ CHARTS = { class Service(SimpleService): def __init__(self, configuration=None, name=None): SimpleService.__init__(self, configuration=configuration, name=name) - self.order = ORDER[:] + self.order = list(ORDER) self.definitions = deepcopy(CHARTS) - self.table_stats = configuration.pop('table_stats', False) - self.index_stats = configuration.pop('index_stats', False) - self.database_poll = configuration.pop('database_poll', None) + self.do_table_stats = configuration.pop('table_stats', False) + self.do_index_stats = configuration.pop('index_stats', False) + self.databases_to_poll = configuration.pop('database_poll', None) + self.statement_timeout = configuration.pop('statement_timeout', DEFAULT_STATEMENT_TIMEOUT) self.configuration = configuration - self.connection = False + self.conn = None self.server_version = None - self.data = dict() - self.locks_zeroed = dict() + self.is_superuser = False + self.alive = False self.databases = list() self.secondaries = list() self.replication_slots = list() - self.queries = QUERY_STATS.copy() - - def _connect(self): - params = dict(user='postgres', - database=None, - password=None, - host=None, - port=5432) - params.update(self.configuration) - - if not self.connection: - try: - self.connection = psycopg2.connect(**params) - self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) - self.connection.set_session(readonly=True) - except OperationalError as error: - return False, str(error) - return True, True + self.queries = dict() + self.data = dict() + + def reconnect(self): + return self.connect() + + def connect(self): + if self.conn: + self.conn.close() + self.conn = None + + try: + params = dict( + host=None, + port=DEFAULT_PORT, + database=None, + user=DEFAULT_USER, + password=None, + connect_timeout=DEFAULT_CONNECT_TIMEOUT, + options='-c statement_timeout={0}'.format(self.statement_timeout), + ) + params.update(self.configuration) + + self.conn = psycopg2.connect(**params) + self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + self.conn.set_session(readonly=True) + except OperationalError as error: + self.error(error) + self.alive = False + else: + self.alive = True + + return self.alive def check(self): if not PSYCOPG2: - self.error('\'python-psycopg2\' module is needed to use postgres.chart.py') + self.error("'python-psycopg2' package is needed to use postgres module") return False - result, error = self._connect() - if not result: - conf = dict((k, (lambda k, v: v if k != 'password' else '*****')(k, v)) - for k, v in self.configuration.items()) - self.error('Failed to connect to %s. Error: %s' % (str(conf), error)) + + if not self.connect(): + self.error('failed to connect to {0}'.format(hide_password(self.configuration))) return False + try: - cursor = self.connection.cursor() - self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES']) - is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER']) - self.secondaries = discover_secondaries_(cursor, QUERIES['FIND_STANDBY']) - self.server_version = detect_server_version(cursor, QUERIES['DETECT_SERVER_VERSION']) - if self.server_version >= 94000: - self.replication_slots = discover_replication_slots_(cursor, QUERIES['FIND_REPLICATION_SLOT']) - cursor.close() - - if self.database_poll and isinstance(self.database_poll, str): - self.databases = [dbase for dbase in self.databases if dbase in self.database_poll.split()] \ - or self.databases - - self.locks_zeroed = populate_lock_types(self.databases) - self.add_additional_queries_(is_superuser) - self.create_dynamic_charts_() - return True + self.check_queries() except Exception as error: - self.error(str(error)) + self.error(error) return False - def add_additional_queries_(self, is_superuser): + self.populate_queries() + self.create_dynamic_charts() - if self.server_version >= 100000: - wal = 'wal' - lsn = 'lsn' - else: - wal = 'xlog' - lsn = 'location' - self.queries[QUERIES['BGWRITER']] = METRICS['BGWRITER'] - self.queries[QUERIES['DIFF_LSN'].format(wal, lsn)] = METRICS['WAL_WRITES'] - self.queries[QUERIES['STANDBY_DELTA'].format(wal, lsn)] = METRICS['STANDBY_DELTA'] - - if self.index_stats: - self.queries[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS'] - if self.table_stats: - self.queries[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS'] - if is_superuser: - self.queries[QUERIES['ARCHIVE'].format(wal)] = METRICS['ARCHIVE'] - if self.server_version >= 90400: - self.queries[QUERIES['WAL'].format(wal, lsn)] = METRICS['WAL'] - if self.server_version >= 100000: - self.queries[QUERIES['REPSLOT_FILES']] = METRICS['REPSLOT_FILES'] - if self.server_version >= 90400: - self.queries[QUERIES['AUTOVACUUM']] = METRICS['AUTOVACUUM'] + return True - def create_dynamic_charts_(self): + def get_data(self): + if not self.alive and not self.reconnect(): + return None - for database_name in self.databases[::-1]: - self.definitions['database_size']['lines'].append( - [database_name + '_size', database_name, 'absolute', 1, 1024 * 1024]) - for chart_name in [name for name in self.order if name.startswith('db_stat')]: - add_database_stat_chart_(order=self.order, definitions=self.definitions, - name=chart_name, database_name=database_name) + try: + cursor = self.conn.cursor(cursor_factory=DictCursor) - add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name) + self.data.update(zero_lock_types(self.databases)) - for application_name in self.secondaries[::-1]: - add_replication_delta_chart_( - order=self.order, - definitions=self.definitions, - name='standby_delta', - application_name=application_name) + for query, metrics in self.queries.items(): + self.query_stats(cursor, query, metrics) - for slot_name in self.replication_slots[::-1]: - add_replication_slot_chart_( - order=self.order, - definitions=self.definitions, - name='replication_slot', - slot_name=slot_name) - - def _get_data(self): - result, _ = self._connect() - if result: - cursor = self.connection.cursor(cursor_factory=DictCursor) - try: - self.data.update(self.locks_zeroed) - for query, metrics in self.queries.items(): - self.query_stats_(cursor, query, metrics) - - except OperationalError: - self.connection = False - cursor.close() - return None - else: - cursor.close() - return self.data - else: + except OperationalError: + self.alive = False return None - def query_stats_(self, cursor, query, metrics): + cursor.close() + + return self.data + + def query_stats(self, cursor, query, metrics): cursor.execute(query, dict(databases=tuple(self.databases))) + for row in cursor: for metric in metrics: + # databases if 'database_name' in row: dimension_id = '_'.join([row['database_name'], metric]) + # secondaries elif 'application_name' in row: dimension_id = '_'.join([row['application_name'], metric]) + # replication slots elif 'slot_name' in row: dimension_id = '_'.join([row['slot_name'], metric]) + # other else: dimension_id = metric + if metric in row: if row[metric] is not None: self.data[dimension_id] = int(row[metric]) elif 'locks_count' in row: - self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0 + if metric == row['mode']: + self.data[dimension_id] = row['locks_count'] + def check_queries(self): + cursor = self.conn.cursor() -def discover_databases_(cursor, query): - cursor.execute(query) - result = list() - for db in [database[0] for database in cursor]: - if db not in result: - result.append(db) - return result + self.server_version = detect_server_version(cursor, query_factory(SERVER_VERSION)) + self.debug('server version: {0}'.format(self.server_version)) + self.is_superuser = check_if_superuser(cursor, query_factory(IF_SUPERUSER)) + self.debug('superuser: {0}'.format(self.is_superuser)) -def discover_secondaries_(cursor, query): - cursor.execute(query) - result = list() - for sc in [standby[0] for standby in cursor]: - if sc not in result: - result.append(sc) - return result + self.databases = discover(cursor, query_factory(DATABASES)) + self.debug('discovered databases {0}'.format(self.databases)) + if self.databases_to_poll: + to_poll = self.databases_to_poll.split() + self.databases = [db for db in self.databases if db in to_poll] or self.databases + + self.secondaries = discover(cursor, query_factory(STANDBY)) + self.debug('discovered secondaries: {0}'.format(self.secondaries)) + + if self.server_version >= 94000: + self.replication_slots = discover(cursor, query_factory(REPLICATION_SLOT)) + self.debug('discovered replication slots: {0}'.format(self.replication_slots)) + + cursor.close() + + def populate_queries(self): + self.queries[query_factory(DATABASE)] = METRICS[DATABASE] + self.queries[query_factory(BACKENDS)] = METRICS[BACKENDS] + self.queries[query_factory(LOCKS)] = METRICS[LOCKS] + self.queries[query_factory(BGWRITER)] = METRICS[BGWRITER] + self.queries[query_factory(DIFF_LSN, self.server_version)] = METRICS[WAL_WRITES] + self.queries[query_factory(STANDBY_DELTA, self.server_version)] = METRICS[STANDBY_DELTA] + + if self.do_index_stats: + self.queries[query_factory(INDEX_STATS)] = METRICS[INDEX_STATS] + if self.do_table_stats: + self.queries[query_factory(TABLE_STATS)] = METRICS[TABLE_STATS] + + if self.is_superuser: + self.queries[query_factory(ARCHIVE, self.server_version)] = METRICS[ARCHIVE] + + if self.server_version >= 90400: + self.queries[query_factory(WAL, self.server_version)] = METRICS[WAL] + + if self.server_version >= 100000: + self.queries[query_factory(REPSLOT_FILES, self.server_version)] = METRICS[REPSLOT_FILES] + + if self.server_version >= 90400: + self.queries[query_factory(AUTOVACUUM)] = METRICS[AUTOVACUUM] + + def create_dynamic_charts(self): + for database_name in self.databases[::-1]: + dim = [ + database_name + '_size', + database_name, + 'absolute', + 1, + 1024 * 1024, + ] + self.definitions['database_size']['lines'].append(dim) + for chart_name in [name for name in self.order if name.startswith('db_stat')]: + add_database_stat_chart( + order=self.order, + definitions=self.definitions, + name=chart_name, + database_name=database_name, + ) + add_database_lock_chart( + order=self.order, + definitions=self.definitions, + database_name=database_name, + ) + + for application_name in self.secondaries[::-1]: + add_replication_delta_chart( + order=self.order, + definitions=self.definitions, + name='standby_delta', + application_name=application_name, + ) + + for slot_name in self.replication_slots[::-1]: + add_replication_slot_chart( + order=self.order, + definitions=self.definitions, + name='replication_slot', + slot_name=slot_name, + ) -def discover_replication_slots_(cursor, query): +def discover(cursor, query): cursor.execute(query) result = list() - for slot in [replication_slot[0] for replication_slot in cursor]: - if slot not in result: - result.append(slot) + for v in [value[0] for value in cursor]: + if v not in result: + result.append(v) return result -def check_if_superuser_(cursor, query): +def check_if_superuser(cursor, query): cursor.execute(query) return cursor.fetchone()[0] @@ -743,7 +1008,7 @@ def detect_server_version(cursor, query): return int(cursor.fetchone()[0]) -def populate_lock_types(databases): +def zero_lock_types(databases): result = dict() for database in databases: for lock_type in METRICS['LOCKS']: @@ -753,7 +1018,11 @@ def populate_lock_types(databases): return result -def add_database_lock_chart_(order, definitions, database_name): +def hide_password(config): + return dict((k, v if k != 'password' else '*****') for k, v in config.items()) + + +def add_database_lock_chart(order, definitions, database_name): def create_lines(database): result = list() for lock_type in METRICS['LOCKS']: @@ -770,7 +1039,7 @@ def add_database_lock_chart_(order, definitions, database_name): } -def add_database_stat_chart_(order, definitions, name, database_name): +def add_database_stat_chart(order, definitions, name, database_name): def create_lines(database, lines): result = list() for line in lines: @@ -787,7 +1056,7 @@ def add_database_stat_chart_(order, definitions, name, database_name): 'lines': create_lines(database_name, chart_template['lines'])} -def add_replication_delta_chart_(order, definitions, name, application_name): +def add_replication_delta_chart(order, definitions, name, application_name): def create_lines(standby, lines): result = list() for line in lines: @@ -799,13 +1068,13 @@ def add_replication_delta_chart_(order, definitions, name, application_name): chart_name = '_'.join([application_name, name]) position = order.index('database_size') order.insert(position, chart_name) - name, title, units, family, context, chart_type = chart_template['options'] + name, title, units, _, context, chart_type = chart_template['options'] definitions[chart_name] = { 'options': [name, title + ': ' + application_name, units, 'replication delta', context, chart_type], 'lines': create_lines(application_name, chart_template['lines'])} -def add_replication_slot_chart_(order, definitions, name, slot_name): +def add_replication_slot_chart(order, definitions, name, slot_name): def create_lines(slot, lines): result = list() for line in lines: @@ -817,7 +1086,7 @@ def add_replication_slot_chart_(order, definitions, name, slot_name): chart_name = '_'.join([slot_name, name]) position = order.index('database_size') order.insert(position, chart_name) - name, title, units, family, context, chart_type = chart_template['options'] + name, title, units, _, context, chart_type = chart_template['options'] definitions[chart_name] = { 'options': [name, title + ': ' + slot_name, units, 'replication slot files', context, chart_type], 'lines': create_lines(slot_name, chart_template['lines'])} diff --git a/collectors/python.d.plugin/postgres/postgres.conf b/collectors/python.d.plugin/postgres/postgres.conf index b69ca3717..cde698f3c 100644 --- a/collectors/python.d.plugin/postgres/postgres.conf +++ b/collectors/python.d.plugin/postgres/postgres.conf @@ -27,11 +27,9 @@ # If unset, the default for python.d.plugin is used. # priority: 60000 -# retries sets the number of retries to be made in case of failures. -# If unset, the default for python.d.plugin is used. -# Attempts to restore the service are made once every update_every -# and only if the module has collected values in the past. -# retries: 60 +# penalty indicates whether to apply penalty to update_every in case of failures. +# Penalty will increase every 5 failed updates in a row. Maximum penalty is 10 minutes. +# penalty: yes # autodetection_retry sets the job re-check interval in seconds. # The job is not deleted if check fails. @@ -58,18 +56,20 @@ # # JOBs sharing a name are mutually exclusive # update_every: 1 # the JOB's data collection frequency # priority: 60000 # the JOB's order on the dashboard -# retries: 60 # the JOB's number of restoration attempts +# penalty: yes # the JOB's penalty # autodetection_retry: 0 # the JOB's re-check interval in seconds # # A single connection is required in order to pull statistics. # # Connections can be configured with the following options: # -# database : 'example_db_name' -# user : 'example_user' -# password : 'example_pass' -# host : 'localhost' -# port : 5432 +# database : 'example_db_name' +# user : 'example_user' +# password : 'example_pass' +# host : 'localhost' +# port : 5432 +# connect_timeout : 2 # in seconds, default is 2 +# statement_timeout : 2000 # in ms, default is 2000 # # Additionally, the following options allow selective disabling of charts # -- cgit v1.2.3