summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/postgres/postgres.chart.py
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/postgres/postgres.chart.py')
-rw-r--r--collectors/python.d.plugin/postgres/postgres.chart.py699
1 files changed, 484 insertions, 215 deletions
diff --git a/collectors/python.d.plugin/postgres/postgres.chart.py b/collectors/python.d.plugin/postgres/postgres.chart.py
index 7f43877c3..e988eec36 100644
--- a/collectors/python.d.plugin/postgres/postgres.chart.py
+++ b/collectors/python.d.plugin/postgres/postgres.chart.py
@@ -16,13 +16,34 @@ except ImportError:
from bases.FrameworkServices.SimpleService import SimpleService
-# default module values
-update_every = 1
-priority = 60000
-retries = 60
+
+DEFAULT_PORT = 5432
+DEFAULT_USER = 'postgres'
+DEFAULT_CONNECT_TIMEOUT = 2 # seconds
+DEFAULT_STATEMENT_TIMEOUT = 5000 # ms
+
+
+WAL = 'WAL'
+ARCHIVE = 'ARCHIVE'
+BACKENDS = 'BACKENDS'
+TABLE_STATS = 'TABLE_STATS'
+INDEX_STATS = 'INDEX_STATS'
+DATABASE = 'DATABASE'
+BGWRITER = 'BGWRITER'
+LOCKS = 'LOCKS'
+DATABASES = 'DATABASES'
+STANDBY = 'STANDBY'
+REPLICATION_SLOT = 'REPLICATION_SLOT'
+STANDBY_DELTA = 'STANDBY_DELTA'
+REPSLOT_FILES = 'REPSLOT_FILES'
+IF_SUPERUSER = 'IF_SUPERUSER'
+SERVER_VERSION = 'SERVER_VERSION'
+AUTOVACUUM = 'AUTOVACUUM'
+DIFF_LSN = 'DIFF_LSN'
+WAL_WRITES = 'WAL_WRITES'
METRICS = {
- 'DATABASE': [
+ DATABASE: [
'connections',
'xact_commit',
'xact_rollback',
@@ -38,32 +59,32 @@ METRICS = {
'temp_bytes',
'size'
],
- 'BACKENDS': [
+ BACKENDS: [
'backends_active',
'backends_idle'
],
- 'INDEX_STATS': [
+ INDEX_STATS: [
'index_count',
'index_size'
],
- 'TABLE_STATS': [
+ TABLE_STATS: [
'table_size',
'table_count'
],
- 'WAL': [
+ WAL: [
'written_wal',
'recycled_wal',
'total_wal'
],
- 'WAL_WRITES': [
+ WAL_WRITES: [
'wal_writes'
],
- 'ARCHIVE': [
+ ARCHIVE: [
'ready_count',
'done_count',
'file_count'
],
- 'BGWRITER': [
+ BGWRITER: [
'checkpoint_scheduled',
'checkpoint_requested',
'buffers_checkpoint',
@@ -73,7 +94,7 @@ METRICS = {
'buffers_alloc',
'buffers_backend_fsync'
],
- 'LOCKS': [
+ LOCKS: [
'ExclusiveLock',
'RowShareLock',
'SIReadLock',
@@ -84,27 +105,61 @@ METRICS = {
'ShareLock',
'RowExclusiveLock'
],
- 'AUTOVACUUM': [
+ AUTOVACUUM: [
'analyze',
'vacuum_analyze',
'vacuum',
'vacuum_freeze',
'brin_summarize'
],
- 'STANDBY_DELTA': [
+ STANDBY_DELTA: [
'sent_delta',
'write_delta',
'flush_delta',
'replay_delta'
],
- 'REPSLOT_FILES': [
+ REPSLOT_FILES: [
'replslot_wal_keep',
'replslot_files'
]
}
-QUERIES = {
- 'WAL': """
+NO_VERSION = 0
+DEFAULT = 'DEFAULT'
+V96 = 'V96'
+V10 = 'V10'
+V11 = 'V11'
+
+
+QUERY_WAL = {
+ DEFAULT: """
+SELECT
+ count(*) as total_wal,
+ count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
+ count(*) FILTER (WHERE type = 'written') AS written_wal
+FROM
+ (SELECT
+ wal.name,
+ pg_walfile_name(
+ CASE pg_is_in_recovery()
+ WHEN true THEN NULL
+ ELSE pg_current_wal_lsn()
+ END ),
+ CASE
+ WHEN wal.name > pg_walfile_name(
+ CASE pg_is_in_recovery()
+ WHEN true THEN NULL
+ ELSE pg_current_wal_lsn()
+ END ) THEN 'recycled'
+ ELSE 'written'
+ END AS type
+ FROM pg_catalog.pg_ls_dir('pg_wal') AS wal(name)
+ WHERE name ~ '^[0-9A-F]{24}$'
+ ORDER BY
+ (pg_stat_file('pg_wal/'||name)).modification,
+ wal.name DESC) sub;
+""",
+ V96: """
SELECT
count(*) as total_wal,
count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
@@ -112,34 +167,49 @@ SELECT
FROM
(SELECT
wal.name,
- pg_{0}file_name(
+ pg_xlogfile_name(
CASE pg_is_in_recovery()
WHEN true THEN NULL
- ELSE pg_current_{0}_{1}()
+ ELSE pg_current_xlog_location()
END ),
CASE
- WHEN wal.name > pg_{0}file_name(
+ WHEN wal.name > pg_xlogfile_name(
CASE pg_is_in_recovery()
WHEN true THEN NULL
- ELSE pg_current_{0}_{1}()
+ ELSE pg_current_xlog_location()
END ) THEN 'recycled'
ELSE 'written'
END AS type
- FROM pg_catalog.pg_ls_dir('pg_{0}') AS wal(name)
- WHERE name ~ '^[0-9A-F]{{24}}$'
+ FROM pg_catalog.pg_ls_dir('pg_xlog') AS wal(name)
+ WHERE name ~ '^[0-9A-F]{24}$'
ORDER BY
- (pg_stat_file('pg_{0}/'||name)).modification,
+ (pg_stat_file('pg_xlog/'||name)).modification,
wal.name DESC) sub;
""",
- 'ARCHIVE': """
+}
+
+QUERY_ARCHIVE = {
+ DEFAULT: """
+SELECT
+ CAST(COUNT(*) AS INT) AS file_count,
+ CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
+ CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
+FROM
+ pg_catalog.pg_ls_dir('pg_wal/archive_status') AS archive_files (archive_file);
+""",
+ V96: """
SELECT
CAST(COUNT(*) AS INT) AS file_count,
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
FROM
- pg_catalog.pg_ls_dir('pg_{0}/archive_status') AS archive_files (archive_file);
+ pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
+
""",
- 'BACKENDS': """
+}
+
+QUERY_BACKEND = {
+ DEFAULT: """
SELECT
count(*) - (SELECT count(*)
FROM pg_stat_activity
@@ -151,21 +221,30 @@ SELECT
AS backends_idle
FROM pg_stat_activity;
""",
- 'TABLE_STATS': """
+}
+
+QUERY_TABLE_STATS = {
+ DEFAULT: """
SELECT
((sum(relpages) * 8) * 1024) AS table_size,
count(1) AS table_count
FROM pg_class
WHERE relkind IN ('r', 't');
""",
- 'INDEX_STATS': """
+}
+
+QUERY_INDEX_STATS = {
+ DEFAULT: """
SELECT
((sum(relpages) * 8) * 1024) AS index_size,
count(1) AS index_count
FROM pg_class
WHERE relkind = 'i';
""",
- 'DATABASE': """
+}
+
+QUERY_DATABASE = {
+ DEFAULT: """
SELECT
datname AS database_name,
numbackends AS connections,
@@ -185,7 +264,10 @@ SELECT
FROM pg_stat_database
WHERE datname IN %(databases)s ;
""",
- 'BGWRITER': """
+}
+
+QUERY_BGWRITER = {
+ DEFAULT: """
SELECT
checkpoints_timed AS checkpoint_scheduled,
checkpoints_req AS checkpoint_requested,
@@ -197,7 +279,10 @@ SELECT
buffers_backend_fsync
FROM pg_stat_bgwriter;
""",
- 'LOCKS': """
+}
+
+QUERY_LOCKS = {
+ DEFAULT: """
SELECT
pg_database.datname as database_name,
mode,
@@ -208,7 +293,10 @@ INNER JOIN pg_database
GROUP BY datname, mode
ORDER BY datname, mode;
""",
- 'FIND_DATABASES': """
+}
+
+QUERY_DATABASES = {
+ DEFAULT: """
SELECT
datname
FROM pg_stat_database
@@ -217,48 +305,129 @@ WHERE
(SELECT current_user), datname, 'connect')
AND NOT datname ~* '^template\d ';
""",
- 'FIND_STANDBY': """
+}
+
+QUERY_STANDBY = {
+ DEFAULT: """
SELECT
application_name
FROM pg_stat_replication
WHERE application_name IS NOT NULL
GROUP BY application_name;
""",
- 'FIND_REPLICATION_SLOT': """
+}
+
+QUERY_REPLICATION_SLOT = {
+ DEFAULT: """
SELECT slot_name
FROM pg_replication_slots;
+"""
+}
+
+QUERY_STANDBY_DELTA = {
+ DEFAULT: """
+SELECT
+ application_name,
+ pg_wal_lsn_diff(
+ CASE pg_is_in_recovery()
+ WHEN true THEN pg_last_wal_receive_lsn()
+ ELSE pg_current_wal_lsn()
+ END,
+ sent_lsn) AS sent_delta,
+ pg_wal_lsn_diff(
+ CASE pg_is_in_recovery()
+ WHEN true THEN pg_last_wal_receive_lsn()
+ ELSE pg_current_wal_lsn()
+ END,
+ write_lsn) AS write_delta,
+ pg_wal_lsn_diff(
+ CASE pg_is_in_recovery()
+ WHEN true THEN pg_last_wal_receive_lsn()
+ ELSE pg_current_wal_lsn()
+ END,
+ flush_lsn) AS flush_delta,
+ pg_wal_lsn_diff(
+ CASE pg_is_in_recovery()
+ WHEN true THEN pg_last_wal_receive_lsn()
+ ELSE pg_current_wal_lsn()
+ END,
+ replay_lsn) AS replay_delta
+FROM pg_stat_replication
+WHERE application_name IS NOT NULL;
""",
- 'STANDBY_DELTA': """
+ V96: """
SELECT
application_name,
- pg_{0}_{1}_diff(
+ pg_xlog_location_diff(
CASE pg_is_in_recovery()
- WHEN true THEN pg_last_{0}_receive_{1}()
- ELSE pg_current_{0}_{1}()
+ WHEN true THEN pg_last_xlog_receive_location()
+ ELSE pg_current_xlog_location()
END,
- sent_{1}) AS sent_delta,
- pg_{0}_{1}_diff(
+ sent_location) AS sent_delta,
+ pg_xlog_location_diff(
CASE pg_is_in_recovery()
- WHEN true THEN pg_last_{0}_receive_{1}()
- ELSE pg_current_{0}_{1}()
+ WHEN true THEN pg_last_xlog_receive_location()
+ ELSE pg_current_xlog_location()
END,
- write_{1}) AS write_delta,
- pg_{0}_{1}_diff(
+ write_location) AS write_delta,
+ pg_xlog_location_diff(
CASE pg_is_in_recovery()
- WHEN true THEN pg_last_{0}_receive_{1}()
- ELSE pg_current_{0}_{1}()
+ WHEN true THEN pg_last_xlog_receive_location()
+ ELSE pg_current_xlog_location()
END,
- flush_{1}) AS flush_delta,
- pg_{0}_{1}_diff(
+ flush_location) AS flush_delta,
+ pg_xlog_location_diff(
CASE pg_is_in_recovery()
- WHEN true THEN pg_last_{0}_receive_{1}()
- ELSE pg_current_{0}_{1}()
+ WHEN true THEN pg_last_xlog_receive_location()
+ ELSE pg_current_xlog_location()
END,
- replay_{1}) AS replay_delta
+ replay_location) AS replay_delta
FROM pg_stat_replication
WHERE application_name IS NOT NULL;
""",
- 'REPSLOT_FILES': """
+}
+
+QUERY_REPSLOT_FILES = {
+ DEFAULT: """
+WITH wal_size AS (
+ SELECT
+ setting::int AS val
+ FROM pg_settings
+ WHERE name = 'wal_segment_size'
+ )
+SELECT
+ slot_name,
+ slot_type,
+ replslot_wal_keep,
+ count(slot_file) AS replslot_files
+FROM
+ (SELECT
+ slot.slot_name,
+ CASE
+ WHEN slot_file <> 'state' THEN 1
+ END AS slot_file ,
+ slot_type,
+ COALESCE (
+ floor(
+ (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
+ - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
+ ),0) AS replslot_wal_keep
+ FROM pg_replication_slots slot
+ LEFT JOIN (
+ SELECT
+ slot2.slot_name,
+ pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
+ FROM pg_replication_slots slot2
+ ) files (slot_name, slot_file)
+ ON slot.slot_name = files.slot_name
+ CROSS JOIN wal_size s
+ ) AS d
+GROUP BY
+ slot_name,
+ slot_type,
+ replslot_wal_keep;
+""",
+ V10: """
WITH wal_size AS (
SELECT
current_setting('wal_block_size')::INT * setting::INT AS val
@@ -297,13 +466,22 @@ GROUP BY
slot_type,
replslot_wal_keep;
""",
- 'IF_SUPERUSER': """
+}
+
+QUERY_SUPERUSER = {
+ DEFAULT: """
SELECT current_setting('is_superuser') = 'on' AS is_superuser;
""",
- 'DETECT_SERVER_VERSION': """
+}
+
+QUERY_SHOW_VERSION = {
+ DEFAULT: """
SHOW server_version_num;
""",
- 'AUTOVACUUM': """
+}
+
+QUERY_AUTOVACUUM = {
+ DEFAULT: """
SELECT
count(*) FILTER (WHERE query LIKE 'autovacuum: ANALYZE%%') AS analyze,
count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM ANALYZE%%') AS vacuum_analyze,
@@ -315,23 +493,78 @@ SELECT
FROM pg_stat_activity
WHERE query NOT LIKE '%%pg_stat_activity%%';
""",
- 'DIFF_LSN': """
+}
+
+QUERY_DIFF_LSN = {
+ DEFAULT: """
SELECT
- pg_{0}_{1}_diff(
+ pg_wal_lsn_diff(
CASE pg_is_in_recovery()
- WHEN true THEN pg_last_{0}_receive_{1}()
- ELSE pg_current_{0}_{1}()
+ WHEN true THEN pg_last_wal_receive_lsn()
+ ELSE pg_current_wal_lsn()
END,
'0/0') as wal_writes ;
-"""
+""",
+ V96: """
+SELECT
+ pg_xlog_location_diff(
+ CASE pg_is_in_recovery()
+ WHEN true THEN pg_last_xlog_receive_location()
+ ELSE pg_current_xlog_location()
+ END,
+ '0/0') as wal_writes ;
+""",
}
-QUERY_STATS = {
- QUERIES['DATABASE']: METRICS['DATABASE'],
- QUERIES['BACKENDS']: METRICS['BACKENDS'],
- QUERIES['LOCKS']: METRICS['LOCKS']
-}
+def query_factory(name, version=NO_VERSION):
+ if name == BACKENDS:
+ return QUERY_BACKEND[DEFAULT]
+ elif name == TABLE_STATS:
+ return QUERY_TABLE_STATS[DEFAULT]
+ elif name == INDEX_STATS:
+ return QUERY_INDEX_STATS[DEFAULT]
+ elif name == DATABASE:
+ return QUERY_DATABASE[DEFAULT]
+ elif name == BGWRITER:
+ return QUERY_BGWRITER[DEFAULT]
+ elif name == LOCKS:
+ return QUERY_LOCKS[DEFAULT]
+ elif name == DATABASES:
+ return QUERY_DATABASES[DEFAULT]
+ elif name == STANDBY:
+ return QUERY_STANDBY[DEFAULT]
+ elif name == REPLICATION_SLOT:
+ return QUERY_REPLICATION_SLOT[DEFAULT]
+ elif name == IF_SUPERUSER:
+ return QUERY_SUPERUSER[DEFAULT]
+ elif name == SERVER_VERSION:
+ return QUERY_SHOW_VERSION[DEFAULT]
+ elif name == AUTOVACUUM:
+ return QUERY_AUTOVACUUM[DEFAULT]
+ elif name == WAL:
+ if version < 100000:
+ return QUERY_WAL[V96]
+ return QUERY_WAL[DEFAULT]
+ elif name == ARCHIVE:
+ if version < 100000:
+ return QUERY_ARCHIVE[V96]
+ return QUERY_ARCHIVE[DEFAULT]
+ elif name == STANDBY_DELTA:
+ if version < 100000:
+ return QUERY_STANDBY_DELTA[V96]
+ return QUERY_STANDBY_DELTA[DEFAULT]
+ elif name == REPSLOT_FILES:
+ if version < 110000:
+ return QUERY_REPSLOT_FILES[V10]
+ return QUERY_REPSLOT_FILES[DEFAULT]
+ elif name == DIFF_LSN:
+ if version < 100000:
+ return QUERY_DIFF_LSN[V96]
+ return QUERY_DIFF_LSN[DEFAULT]
+
+ raise ValueError('unknown query')
+
ORDER = [
'db_stat_temp_files',
@@ -403,7 +636,7 @@ CHARTS = {
]
},
'db_stat_temp_bytes': {
- 'options': [None, 'Temp files written to disk', 'KB/s', 'db statistics', 'postgres.db_stat_temp_bytes',
+ 'options': [None, 'Temp files written to disk', 'KiB/s', 'db statistics', 'postgres.db_stat_temp_bytes',
'line'],
'lines': [
['temp_bytes', 'size', 'incremental', 1, 1024]
@@ -417,7 +650,7 @@ CHARTS = {
]
},
'database_size': {
- 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'],
+ 'options': [None, 'Database size', 'MiB', 'database size', 'postgres.db_size', 'stacked'],
'lines': [
]
},
@@ -436,7 +669,7 @@ CHARTS = {
]
},
'index_size': {
- 'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
+ 'options': [None, 'Indexes size', 'MiB', 'indexes', 'postgres.index_size', 'line'],
'lines': [
['index_size', 'size', 'absolute', 1, 1024 * 1024]
]
@@ -448,7 +681,7 @@ CHARTS = {
]
},
'table_size': {
- 'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
+ 'options': [None, 'Tables size', 'MiB', 'tables', 'postgres.table_size', 'line'],
'lines': [
['table_size', 'size', 'absolute', 1, 1024 * 1024]
]
@@ -462,7 +695,7 @@ CHARTS = {
]
},
'wal_writes': {
- 'options': [None, 'Write-Ahead Logs', 'kilobytes/s', 'wal_writes', 'postgres.wal_writes', 'line'],
+ 'options': [None, 'Write-Ahead Logs', 'KiB/s', 'wal_writes', 'postgres.wal_writes', 'line'],
'lines': [
['wal_writes', 'writes', 'incremental', 1, 1024]
]
@@ -483,20 +716,20 @@ CHARTS = {
]
},
'stat_bgwriter_alloc': {
- 'options': [None, 'Buffers allocated', 'kilobytes/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'],
+ 'options': [None, 'Buffers allocated', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'],
'lines': [
['buffers_alloc', 'alloc', 'incremental', 1, 1024]
]
},
'stat_bgwriter_checkpoint': {
- 'options': [None, 'Buffers written during checkpoints', 'kilobytes/s', 'bgwriter',
+ 'options': [None, 'Buffers written during checkpoints', 'KiB/s', 'bgwriter',
'postgres.stat_bgwriter_checkpoint', 'line'],
'lines': [
['buffers_checkpoint', 'checkpoint', 'incremental', 1, 1024]
]
},
'stat_bgwriter_backend': {
- 'options': [None, 'Buffers written directly by a backend', 'kilobytes/s', 'bgwriter',
+ 'options': [None, 'Buffers written directly by a backend', 'KiB/s', 'bgwriter',
'postgres.stat_bgwriter_backend', 'line'],
'lines': [
['buffers_backend', 'backend', 'incremental', 1, 1024]
@@ -509,7 +742,7 @@ CHARTS = {
]
},
'stat_bgwriter_bgwriter': {
- 'options': [None, 'Buffers written by the background writer', 'kilobytes/s', 'bgwriter',
+ 'options': [None, 'Buffers written by the background writer', 'KiB/s', 'bgwriter',
'postgres.bgwriter_bgwriter', 'line'],
'lines': [
['buffers_clean', 'clean', 'incremental', 1, 1024]
@@ -533,7 +766,7 @@ CHARTS = {
]
},
'standby_delta': {
- 'options': [None, 'Standby delta', 'kilobytes', 'replication delta', 'postgres.standby_delta', 'line'],
+ 'options': [None, 'Standby delta', 'KiB', 'replication delta', 'postgres.standby_delta', 'line'],
'lines': [
['sent_delta', 'sent delta', 'absolute', 1, 1024],
['write_delta', 'write delta', 'absolute', 1, 1024],
@@ -554,186 +787,218 @@ CHARTS = {
class Service(SimpleService):
def __init__(self, configuration=None, name=None):
SimpleService.__init__(self, configuration=configuration, name=name)
- self.order = ORDER[:]
+ self.order = list(ORDER)
self.definitions = deepcopy(CHARTS)
- self.table_stats = configuration.pop('table_stats', False)
- self.index_stats = configuration.pop('index_stats', False)
- self.database_poll = configuration.pop('database_poll', None)
+ self.do_table_stats = configuration.pop('table_stats', False)
+ self.do_index_stats = configuration.pop('index_stats', False)
+ self.databases_to_poll = configuration.pop('database_poll', None)
+ self.statement_timeout = configuration.pop('statement_timeout', DEFAULT_STATEMENT_TIMEOUT)
self.configuration = configuration
- self.connection = False
+ self.conn = None
self.server_version = None
- self.data = dict()
- self.locks_zeroed = dict()
+ self.is_superuser = False
+ self.alive = False
self.databases = list()
self.secondaries = list()
self.replication_slots = list()
- self.queries = QUERY_STATS.copy()
-
- def _connect(self):
- params = dict(user='postgres',
- database=None,
- password=None,
- host=None,
- port=5432)
- params.update(self.configuration)
-
- if not self.connection:
- try:
- self.connection = psycopg2.connect(**params)
- self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- self.connection.set_session(readonly=True)
- except OperationalError as error:
- return False, str(error)
- return True, True
+ self.queries = dict()
+ self.data = dict()
+
+ def reconnect(self):
+ return self.connect()
+
+ def connect(self):
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+
+ try:
+ params = dict(
+ host=None,
+ port=DEFAULT_PORT,
+ database=None,
+ user=DEFAULT_USER,
+ password=None,
+ connect_timeout=DEFAULT_CONNECT_TIMEOUT,
+ options='-c statement_timeout={0}'.format(self.statement_timeout),
+ )
+ params.update(self.configuration)
+
+ self.conn = psycopg2.connect(**params)
+ self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ self.conn.set_session(readonly=True)
+ except OperationalError as error:
+ self.error(error)
+ self.alive = False
+ else:
+ self.alive = True
+
+ return self.alive
def check(self):
if not PSYCOPG2:
- self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
+ self.error("'python-psycopg2' package is needed to use postgres module")
return False
- result, error = self._connect()
- if not result:
- conf = dict((k, (lambda k, v: v if k != 'password' else '*****')(k, v))
- for k, v in self.configuration.items())
- self.error('Failed to connect to %s. Error: %s' % (str(conf), error))
+
+ if not self.connect():
+ self.error('failed to connect to {0}'.format(hide_password(self.configuration)))
return False
+
try:
- cursor = self.connection.cursor()
- self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
- is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
- self.secondaries = discover_secondaries_(cursor, QUERIES['FIND_STANDBY'])
- self.server_version = detect_server_version(cursor, QUERIES['DETECT_SERVER_VERSION'])
- if self.server_version >= 94000:
- self.replication_slots = discover_replication_slots_(cursor, QUERIES['FIND_REPLICATION_SLOT'])
- cursor.close()
-
- if self.database_poll and isinstance(self.database_poll, str):
- self.databases = [dbase for dbase in self.databases if dbase in self.database_poll.split()] \
- or self.databases
-
- self.locks_zeroed = populate_lock_types(self.databases)
- self.add_additional_queries_(is_superuser)
- self.create_dynamic_charts_()
- return True
+ self.check_queries()
except Exception as error:
- self.error(str(error))
+ self.error(error)
return False
- def add_additional_queries_(self, is_superuser):
+ self.populate_queries()
+ self.create_dynamic_charts()
- if self.server_version >= 100000:
- wal = 'wal'
- lsn = 'lsn'
- else:
- wal = 'xlog'
- lsn = 'location'
- self.queries[QUERIES['BGWRITER']] = METRICS['BGWRITER']
- self.queries[QUERIES['DIFF_LSN'].format(wal, lsn)] = METRICS['WAL_WRITES']
- self.queries[QUERIES['STANDBY_DELTA'].format(wal, lsn)] = METRICS['STANDBY_DELTA']
-
- if self.index_stats:
- self.queries[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
- if self.table_stats:
- self.queries[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
- if is_superuser:
- self.queries[QUERIES['ARCHIVE'].format(wal)] = METRICS['ARCHIVE']
- if self.server_version >= 90400:
- self.queries[QUERIES['WAL'].format(wal, lsn)] = METRICS['WAL']
- if self.server_version >= 100000:
- self.queries[QUERIES['REPSLOT_FILES']] = METRICS['REPSLOT_FILES']
- if self.server_version >= 90400:
- self.queries[QUERIES['AUTOVACUUM']] = METRICS['AUTOVACUUM']
+ return True
- def create_dynamic_charts_(self):
+ def get_data(self):
+ if not self.alive and not self.reconnect():
+ return None
- for database_name in self.databases[::-1]:
- self.definitions['database_size']['lines'].append(
- [database_name + '_size', database_name, 'absolute', 1, 1024 * 1024])
- for chart_name in [name for name in self.order if name.startswith('db_stat')]:
- add_database_stat_chart_(order=self.order, definitions=self.definitions,
- name=chart_name, database_name=database_name)
+ try:
+ cursor = self.conn.cursor(cursor_factory=DictCursor)
- add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
+ self.data.update(zero_lock_types(self.databases))
- for application_name in self.secondaries[::-1]:
- add_replication_delta_chart_(
- order=self.order,
- definitions=self.definitions,
- name='standby_delta',
- application_name=application_name)
+ for query, metrics in self.queries.items():
+ self.query_stats(cursor, query, metrics)
- for slot_name in self.replication_slots[::-1]:
- add_replication_slot_chart_(
- order=self.order,
- definitions=self.definitions,
- name='replication_slot',
- slot_name=slot_name)
-
- def _get_data(self):
- result, _ = self._connect()
- if result:
- cursor = self.connection.cursor(cursor_factory=DictCursor)
- try:
- self.data.update(self.locks_zeroed)
- for query, metrics in self.queries.items():
- self.query_stats_(cursor, query, metrics)
-
- except OperationalError:
- self.connection = False
- cursor.close()
- return None
- else:
- cursor.close()
- return self.data
- else:
+ except OperationalError:
+ self.alive = False
return None
- def query_stats_(self, cursor, query, metrics):
+ cursor.close()
+
+ return self.data
+
+ def query_stats(self, cursor, query, metrics):
cursor.execute(query, dict(databases=tuple(self.databases)))
+
for row in cursor:
for metric in metrics:
+ # databases
if 'database_name' in row:
dimension_id = '_'.join([row['database_name'], metric])
+ # secondaries
elif 'application_name' in row:
dimension_id = '_'.join([row['application_name'], metric])
+ # replication slots
elif 'slot_name' in row:
dimension_id = '_'.join([row['slot_name'], metric])
+ # other
else:
dimension_id = metric
+
if metric in row:
if row[metric] is not None:
self.data[dimension_id] = int(row[metric])
elif 'locks_count' in row:
- self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0
+ if metric == row['mode']:
+ self.data[dimension_id] = row['locks_count']
+ def check_queries(self):
+ cursor = self.conn.cursor()
-def discover_databases_(cursor, query):
- cursor.execute(query)
- result = list()
- for db in [database[0] for database in cursor]:
- if db not in result:
- result.append(db)
- return result
+ self.server_version = detect_server_version(cursor, query_factory(SERVER_VERSION))
+ self.debug('server version: {0}'.format(self.server_version))
+ self.is_superuser = check_if_superuser(cursor, query_factory(IF_SUPERUSER))
+ self.debug('superuser: {0}'.format(self.is_superuser))
-def discover_secondaries_(cursor, query):
- cursor.execute(query)
- result = list()
- for sc in [standby[0] for standby in cursor]:
- if sc not in result:
- result.append(sc)
- return result
+ self.databases = discover(cursor, query_factory(DATABASES))
+ self.debug('discovered databases {0}'.format(self.databases))
+ if self.databases_to_poll:
+ to_poll = self.databases_to_poll.split()
+ self.databases = [db for db in self.databases if db in to_poll] or self.databases
+
+ self.secondaries = discover(cursor, query_factory(STANDBY))
+ self.debug('discovered secondaries: {0}'.format(self.secondaries))
+
+ if self.server_version >= 94000:
+ self.replication_slots = discover(cursor, query_factory(REPLICATION_SLOT))
+ self.debug('discovered replication slots: {0}'.format(self.replication_slots))
+
+ cursor.close()
+
+ def populate_queries(self):
+ self.queries[query_factory(DATABASE)] = METRICS[DATABASE]
+ self.queries[query_factory(BACKENDS)] = METRICS[BACKENDS]
+ self.queries[query_factory(LOCKS)] = METRICS[LOCKS]
+ self.queries[query_factory(BGWRITER)] = METRICS[BGWRITER]
+ self.queries[query_factory(DIFF_LSN, self.server_version)] = METRICS[WAL_WRITES]
+ self.queries[query_factory(STANDBY_DELTA, self.server_version)] = METRICS[STANDBY_DELTA]
+
+ if self.do_index_stats:
+ self.queries[query_factory(INDEX_STATS)] = METRICS[INDEX_STATS]
+ if self.do_table_stats:
+ self.queries[query_factory(TABLE_STATS)] = METRICS[TABLE_STATS]
+
+ if self.is_superuser:
+ self.queries[query_factory(ARCHIVE, self.server_version)] = METRICS[ARCHIVE]
+
+ if self.server_version >= 90400:
+ self.queries[query_factory(WAL, self.server_version)] = METRICS[WAL]
+
+ if self.server_version >= 100000:
+ self.queries[query_factory(REPSLOT_FILES, self.server_version)] = METRICS[REPSLOT_FILES]
+
+ if self.server_version >= 90400:
+ self.queries[query_factory(AUTOVACUUM)] = METRICS[AUTOVACUUM]
+
+ def create_dynamic_charts(self):
+ for database_name in self.databases[::-1]:
+ dim = [
+ database_name + '_size',
+ database_name,
+ 'absolute',
+ 1,
+ 1024 * 1024,
+ ]
+ self.definitions['database_size']['lines'].append(dim)
+ for chart_name in [name for name in self.order if name.startswith('db_stat')]:
+ add_database_stat_chart(
+ order=self.order,
+ definitions=self.definitions,
+ name=chart_name,
+ database_name=database_name,
+ )
+ add_database_lock_chart(
+ order=self.order,
+ definitions=self.definitions,
+ database_name=database_name,
+ )
+
+ for application_name in self.secondaries[::-1]:
+ add_replication_delta_chart(
+ order=self.order,
+ definitions=self.definitions,
+ name='standby_delta',
+ application_name=application_name,
+ )
+
+ for slot_name in self.replication_slots[::-1]:
+ add_replication_slot_chart(
+ order=self.order,
+ definitions=self.definitions,
+ name='replication_slot',
+ slot_name=slot_name,
+ )
-def discover_replication_slots_(cursor, query):
+def discover(cursor, query):
cursor.execute(query)
result = list()
- for slot in [replication_slot[0] for replication_slot in cursor]:
- if slot not in result:
- result.append(slot)
+ for v in [value[0] for value in cursor]:
+ if v not in result:
+ result.append(v)
return result
-def check_if_superuser_(cursor, query):
+def check_if_superuser(cursor, query):
cursor.execute(query)
return cursor.fetchone()[0]
@@ -743,7 +1008,7 @@ def detect_server_version(cursor, query):
return int(cursor.fetchone()[0])
-def populate_lock_types(databases):
+def zero_lock_types(databases):
result = dict()
for database in databases:
for lock_type in METRICS['LOCKS']:
@@ -753,7 +1018,11 @@ def populate_lock_types(databases):
return result
-def add_database_lock_chart_(order, definitions, database_name):
+def hide_password(config):
+ return dict((k, v if k != 'password' else '*****') for k, v in config.items())
+
+
+def add_database_lock_chart(order, definitions, database_name):
def create_lines(database):
result = list()
for lock_type in METRICS['LOCKS']:
@@ -770,7 +1039,7 @@ def add_database_lock_chart_(order, definitions, database_name):
}
-def add_database_stat_chart_(order, definitions, name, database_name):
+def add_database_stat_chart(order, definitions, name, database_name):
def create_lines(database, lines):
result = list()
for line in lines:
@@ -787,7 +1056,7 @@ def add_database_stat_chart_(order, definitions, name, database_name):
'lines': create_lines(database_name, chart_template['lines'])}
-def add_replication_delta_chart_(order, definitions, name, application_name):
+def add_replication_delta_chart(order, definitions, name, application_name):
def create_lines(standby, lines):
result = list()
for line in lines:
@@ -799,13 +1068,13 @@ def add_replication_delta_chart_(order, definitions, name, application_name):
chart_name = '_'.join([application_name, name])
position = order.index('database_size')
order.insert(position, chart_name)
- name, title, units, family, context, chart_type = chart_template['options']
+ name, title, units, _, context, chart_type = chart_template['options']
definitions[chart_name] = {
'options': [name, title + ': ' + application_name, units, 'replication delta', context, chart_type],
'lines': create_lines(application_name, chart_template['lines'])}
-def add_replication_slot_chart_(order, definitions, name, slot_name):
+def add_replication_slot_chart(order, definitions, name, slot_name):
def create_lines(slot, lines):
result = list()
for line in lines:
@@ -817,7 +1086,7 @@ def add_replication_slot_chart_(order, definitions, name, slot_name):
chart_name = '_'.join([slot_name, name])
position = order.index('database_size')
order.insert(position, chart_name)
- name, title, units, family, context, chart_type = chart_template['options']
+ name, title, units, _, context, chart_type = chart_template['options']
definitions[chart_name] = {
'options': [name, title + ': ' + slot_name, units, 'replication slot files', context, chart_type],
'lines': create_lines(slot_name, chart_template['lines'])}