summaryrefslogtreecommitdiffstats
path: root/python.d/postgres.chart.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--python.d/postgres.chart.py526
1 files changed, 273 insertions, 253 deletions
diff --git a/python.d/postgres.chart.py b/python.d/postgres.chart.py
index 919b6f8ee..d359bb4f7 100644
--- a/python.d/postgres.chart.py
+++ b/python.d/postgres.chart.py
@@ -2,12 +2,16 @@
# Description: example netdata python.d module
# Authors: facetoe, dangtranhoang
-import re
from copy import deepcopy
-import psycopg2
-from psycopg2 import extensions
-from psycopg2.extras import DictCursor
+try:
+ import psycopg2
+ from psycopg2 import extensions
+ from psycopg2.extras import DictCursor
+ from psycopg2 import OperationalError
+ PSYCOPG2 = True
+except ImportError:
+ PSYCOPG2 = False
from base import SimpleService
@@ -16,39 +20,70 @@ update_every = 1
priority = 90000
retries = 60
-ARCHIVE = """
+METRICS = dict(
+ DATABASE=['connections',
+ 'xact_commit',
+ 'xact_rollback',
+ 'blks_read',
+ 'blks_hit',
+ 'tup_returned',
+ 'tup_fetched',
+ 'tup_inserted',
+ 'tup_updated',
+ 'tup_deleted',
+ 'conflicts',
+ 'size'],
+ BACKENDS=['backends_active',
+ 'backends_idle'],
+ INDEX_STATS=['index_count',
+ 'index_size'],
+ TABLE_STATS=['table_size',
+ 'table_count'],
+ ARCHIVE=['ready_count',
+ 'done_count',
+ 'file_count'],
+ BGWRITER=['writer_scheduled',
+ 'writer_requested'],
+ LOCKS=['ExclusiveLock',
+ 'RowShareLock',
+ 'SIReadLock',
+ 'ShareUpdateExclusiveLock',
+ 'AccessExclusiveLock',
+ 'AccessShareLock',
+ 'ShareRowExclusiveLock',
+ 'ShareLock',
+ 'RowExclusiveLock']
+)
+
+QUERIES = dict(
+ ARCHIVE="""
SELECT
CAST(COUNT(*) AS INT) AS file_count,
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
FROM
pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
-"""
-
-BACKENDS = """
+""",
+ BACKENDS="""
SELECT
count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
(SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
-FROM
- pg_stat_activity;
-"""
-
-TABLE_STATS = """
+FROM pg_stat_activity;
+""",
+ TABLE_STATS="""
SELECT
- ((sum(relpages) * 8) * 1024) AS size_relations,
- count(1) AS relations
+ ((sum(relpages) * 8) * 1024) AS table_size,
+ count(1) AS table_count
FROM pg_class
WHERE relkind IN ('r', 't');
-"""
-
-INDEX_STATS = """
+""",
+ INDEX_STATS="""
SELECT
- ((sum(relpages) * 8) * 1024) AS size_indexes,
- count(1) AS indexes
+ ((sum(relpages) * 8) * 1024) AS index_size,
+ count(1) AS index_count
FROM pg_class
-WHERE relkind = 'i';"""
-
-DATABASE = """
+WHERE relkind = 'i';""",
+ DATABASE="""
SELECT
datname AS database_name,
sum(numbackends) AS connections,
@@ -61,93 +96,108 @@ SELECT
sum(tup_inserted) AS tup_inserted,
sum(tup_updated) AS tup_updated,
sum(tup_deleted) AS tup_deleted,
- sum(conflicts) AS conflicts
+ sum(conflicts) AS conflicts,
+ pg_database_size(datname) AS size
FROM pg_stat_database
WHERE NOT datname ~* '^template\d+'
GROUP BY database_name;
-"""
-
-BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
-DATABASE_LOCKS = """
+""",
+ BGWRITER="""
+SELECT
+ checkpoints_timed AS writer_scheduled,
+ checkpoints_req AS writer_requested
+FROM pg_stat_bgwriter;""",
+ LOCKS="""
SELECT
pg_database.datname as database_name,
mode,
- count(mode) AS count
+ count(mode) AS locks_count
FROM pg_locks
INNER JOIN pg_database ON pg_database.oid = pg_locks.database
GROUP BY datname, mode
ORDER BY datname, mode;
-"""
-REPLICATION = """
-SELECT
- client_hostname,
- client_addr,
- state,
- sent_offset - (
- replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
-FROM (
- SELECT
- client_addr, client_hostname, state,
- ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
- ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
- ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
- ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
- FROM pg_stat_replication
-) AS s;
-"""
-
-LOCK_TYPES = [
- 'ExclusiveLock',
- 'RowShareLock',
- 'SIReadLock',
- 'ShareUpdateExclusiveLock',
- 'AccessExclusiveLock',
- 'AccessShareLock',
- 'ShareRowExclusiveLock',
- 'ShareLock',
- 'RowExclusiveLock'
-]
-
-ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
+""",
+ FIND_DATABASES="""
+SELECT datname FROM pg_stat_database WHERE NOT datname ~* '^template\d+'
+""",
+ IF_SUPERUSER="""
+SELECT current_setting('is_superuser') = 'on' AS is_superuser;
+ """)
+
+# REPLICATION = """
+# SELECT
+# client_hostname,
+# client_addr,
+# state,
+# sent_offset - (
+# replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
+# FROM (
+# SELECT
+# client_addr, client_hostname, state,
+# ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
+# ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
+# ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
+# ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
+# FROM pg_stat_replication
+# ) AS s;
+# """
+
+
+QUERY_STATS = {
+ QUERIES['DATABASE']: METRICS['DATABASE'],
+ QUERIES['BACKENDS']: METRICS['BACKENDS'],
+ QUERIES['ARCHIVE']: METRICS['ARCHIVE'],
+ QUERIES['LOCKS']: METRICS['LOCKS']
+}
+
+ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size',
'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
CHARTS = {
'db_stat_transactions': {
- 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', 'line'],
+ 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
+ 'line'],
'lines': [
- ['db_stat_xact_commit', 'committed', 'incremental'],
- ['db_stat_xact_rollback', 'rolled back', 'incremental']
+ ['xact_commit', 'committed', 'incremental'],
+ ['xact_rollback', 'rolled back', 'incremental']
]},
'db_stat_connections': {
- 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', 'line'],
+ 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
+ 'line'],
'lines': [
- ['db_stat_connections', 'connections', 'absolute']
+ ['connections', 'connections', 'absolute']
]},
'db_stat_tuple_read': {
'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
'lines': [
- ['db_stat_blks_read', 'disk', 'incremental'],
- ['db_stat_blks_hit', 'cache', 'incremental']
+ ['blks_read', 'disk', 'incremental'],
+ ['blks_hit', 'cache', 'incremental']
]},
'db_stat_tuple_returned': {
- 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', 'line'],
+ 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
+ 'line'],
'lines': [
- ['db_stat_tup_returned', 'sequential', 'incremental'],
- ['db_stat_tup_fetched', 'bitmap', 'incremental']
+ ['tup_returned', 'sequential', 'incremental'],
+ ['tup_fetched', 'bitmap', 'incremental']
]},
'db_stat_tuple_write': {
'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
'lines': [
- ['db_stat_tup_inserted', 'inserted', 'incremental'],
- ['db_stat_tup_updated', 'updated', 'incremental'],
- ['db_stat_tup_deleted', 'deleted', 'incremental'],
- ['db_stat_conflicts', 'conflicts', 'incremental']
+ ['tup_inserted', 'inserted', 'incremental'],
+ ['tup_updated', 'updated', 'incremental'],
+ ['tup_deleted', 'deleted', 'incremental'],
+ ['conflicts', 'conflicts', 'incremental']
+ ]},
+ 'database_size': {
+ 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'],
+ 'lines': [
]},
'backend_process': {
- 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', 'line'],
+ 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
+ 'line'],
'lines': [
- ['backend_process_active', 'active', 'absolute'],
- ['backend_process_idle', 'idle', 'absolute']
+ ['backends_active', 'active', 'absolute'],
+ ['backends_idle', 'idle', 'absolute']
]},
'index_count': {
'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
@@ -172,15 +222,15 @@ CHARTS = {
'wal': {
'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
'lines': [
- ['wal_total', 'total', 'incremental'],
- ['wal_ready', 'ready', 'incremental'],
- ['wal_done', 'done', 'incremental']
+ ['file_count', 'total', 'incremental'],
+ ['ready_count', 'ready', 'incremental'],
+ ['done_count', 'done', 'incremental']
]},
'background_writer': {
'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
'lines': [
- ['background_writer_scheduled', 'scheduled', 'incremental'],
- ['background_writer_requested', 'requested', 'incremental']
+ ['writer_scheduled', 'scheduled', 'incremental'],
+ ['writer_requested', 'requested', 'incremental']
]}
}
@@ -188,199 +238,169 @@ CHARTS = {
class Service(SimpleService):
def __init__(self, configuration=None, name=None):
super(self.__class__, self).__init__(configuration=configuration, name=name)
- self.order = ORDER
- self.definitions = CHARTS
- self.table_stats = configuration.pop('table_stats', True)
- self.index_stats = configuration.pop('index_stats', True)
+ self.order = ORDER[:]
+ self.definitions = deepcopy(CHARTS)
+ self.table_stats = configuration.pop('table_stats', False)
+ self.index_stats = configuration.pop('index_stats', False)
self.configuration = configuration
- self.connection = None
+ self.connection = False
self.is_superuser = False
- self.data = {}
- self.databases = set()
+ self.data = dict()
+ self.locks_zeroed = dict()
+ self.databases = list()
def _connect(self):
params = dict(user='postgres',
database=None,
password=None,
- host='localhost',
+ host=None,
port=5432)
params.update(self.configuration)
if not self.connection:
- self.connection = psycopg2.connect(**params)
- self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- self.connection.set_session(readonly=True)
+ try:
+ self.connection = psycopg2.connect(**params)
+ self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ self.connection.set_session(readonly=True)
+ except OperationalError as error:
+ return False, str(error)
+ return True, True
def check(self):
+ if not PSYCOPG2:
+ self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
+ return False
+ result, error = self._connect()
+ if not result:
+ conf = dict([(k, (lambda k, v: v if k != 'password' else '*****')(k, v)) for k, v in self.configuration.items()])
+ self.error('Failed to connect to %s. Error: %s' % (str(conf), error))
+ return False
try:
- self._connect()
cursor = self.connection.cursor()
- self._discover_databases(cursor)
- self._check_if_superuser(cursor)
+ self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
+ is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
cursor.close()
- self._create_definitions()
+ self.locks_zeroed = populate_lock_types(self.databases)
+ self.add_additional_queries_(is_superuser)
+ self.create_dynamic_charts_()
return True
- except Exception as e:
- self.error(str(e))
+ except Exception as error:
+ self.error(str(error))
return False
- def _discover_databases(self, cursor):
- cursor.execute("""
- SELECT datname
- FROM pg_stat_database
- WHERE NOT datname ~* '^template\d+'
- """)
- self.databases = set(r[0] for r in cursor)
-
- def _check_if_superuser(self, cursor):
- cursor.execute("""
- SELECT current_setting('is_superuser') = 'on' AS is_superuser;
- """)
- self.is_superuser = cursor.fetchone()[0]
-
- def _create_definitions(self):
- for database_name in self.databases:
- for chart_template_name in list(CHARTS):
- if chart_template_name.startswith('db_stat'):
- self._add_database_stat_chart(chart_template_name, database_name)
- self._add_database_lock_chart(database_name)
-
- def _add_database_stat_chart(self, chart_template_name, database_name):
- chart_template = CHARTS[chart_template_name]
- chart_name = "{0}_{1}".format(database_name, chart_template_name)
- if chart_name not in self.order:
- self.order.insert(0, chart_name)
- name, title, units, family, context, chart_type = chart_template['options']
- self.definitions[chart_name] = {
- 'options': [
- name,
- title + ': ' + database_name,
- units,
- 'db ' + database_name,
- context,
- chart_type
- ]
- }
-
- self.definitions[chart_name]['lines'] = []
- for line in deepcopy(chart_template['lines']):
- line[0] = "{0}_{1}".format(database_name, line[0])
- self.definitions[chart_name]['lines'].append(line)
-
- def _add_database_lock_chart(self, database_name):
- chart_name = "{0}_locks".format(database_name)
- if chart_name not in self.order:
- self.order.insert(-1, chart_name)
- self.definitions[chart_name] = dict(
- options=
- [
- None,
- 'Locks on db: ' + database_name,
- 'locks',
- 'db ' + database_name,
- 'postgres.db_locks',
- 'line'
- ],
- lines=[]
- )
-
- for lock_type in LOCK_TYPES:
- lock_id = "{0}_{1}".format(database_name, lock_type)
- label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
- self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
-
- def _get_data(self):
- self._connect()
-
- cursor = self.connection.cursor(cursor_factory=DictCursor)
- self.add_stats(cursor)
-
- cursor.close()
- return self.data
-
- def add_stats(self, cursor):
- self.add_database_stats(cursor)
- self.add_backend_stats(cursor)
+ def add_additional_queries_(self, is_superuser):
if self.index_stats:
- self.add_index_stats(cursor)
+ QUERY_STATS[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
if self.table_stats:
- self.add_table_stats(cursor)
- self.add_lock_stats(cursor)
- self.add_bgwriter_stats(cursor)
+ QUERY_STATS[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
+ if is_superuser:
+ QUERY_STATS[QUERIES['BGWRITER']] = METRICS['BGWRITER']
- # self.add_replication_stats(cursor)
+ def create_dynamic_charts_(self):
- if self.is_superuser:
- self.add_wal_stats(cursor)
+ for database_name in self.databases[::-1]:
+ self.definitions['database_size']['lines'].append([database_name + '_size',
+ database_name, 'absolute', 1, 1024 * 1024])
+ for chart_name in [name for name in CHARTS if name.startswith('db_stat')]:
+ add_database_stat_chart_(order=self.order, definitions=self.definitions,
+ name=chart_name, database_name=database_name)
- def add_database_stats(self, cursor):
- cursor.execute(DATABASE)
- for row in cursor:
- database_name = row.get('database_name')
- self.data["{0}_{1}".format(database_name, 'db_stat_xact_commit')] = int(row.get('xact_commit', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_xact_rollback')] = int(row.get('xact_rollback', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_blks_read')] = int(row.get('blks_read', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_blks_hit')] = int(row.get('blks_hit', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_tup_returned')] = int(row.get('tup_returned', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_tup_fetched')] = int(row.get('tup_fetched', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_tup_inserted')] = int(row.get('tup_inserted', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_tup_updated')] = int(row.get('tup_updated', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_tup_deleted')] = int(row.get('tup_deleted', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_conflicts')] = int(row.get('conflicts', 0))
- self.data["{0}_{1}".format(database_name, 'db_stat_connections')] = int(row.get('connections', 0))
-
- def add_backend_stats(self, cursor):
- cursor.execute(BACKENDS)
- temp = cursor.fetchone()
-
- self.data['backend_process_active'] = int(temp.get('backends_active', 0))
- self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
-
- def add_index_stats(self, cursor):
- cursor.execute(INDEX_STATS)
- temp = cursor.fetchone()
- self.data['index_count'] = int(temp.get('indexes', 0))
- self.data['index_size'] = int(temp.get('size_indexes', 0))
-
- def add_table_stats(self, cursor):
- cursor.execute(TABLE_STATS)
- temp = cursor.fetchone()
- self.data['table_count'] = int(temp.get('relations', 0))
- self.data['table_size'] = int(temp.get('size_relations', 0))
-
- def add_lock_stats(self, cursor):
- cursor.execute(DATABASE_LOCKS)
-
- # zero out all current lock values
- for database_name in self.databases:
- for lock_type in LOCK_TYPES:
- self.data["{0}_{1}".format(database_name, lock_type)] = 0
-
- # populate those that have current locks
+ add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
+
+ def _get_data(self):
+ result, error = self._connect()
+ if result:
+ cursor = self.connection.cursor(cursor_factory=DictCursor)
+ try:
+ self.data.update(self.locks_zeroed)
+ for query, metrics in QUERY_STATS.items():
+ self.query_stats_(cursor, query, metrics)
+
+ except OperationalError:
+ self.connection = False
+ cursor.close()
+ return None
+ else:
+ cursor.close()
+ return self.data
+ else:
+ return None
+
+ def query_stats_(self, cursor, query, metrics):
+ cursor.execute(query)
for row in cursor:
- database_name, lock_type, lock_count = row
- self.data["{0}_{1}".format(database_name, lock_type)] = lock_count
-
- def add_wal_stats(self, cursor):
- cursor.execute(ARCHIVE)
- temp = cursor.fetchone()
- self.data['wal_total'] = int(temp.get('file_count', 0))
- self.data['wal_ready'] = int(temp.get('ready_count', 0))
- self.data['wal_done'] = int(temp.get('done_count', 0))
-
- def add_bgwriter_stats(self, cursor):
- cursor.execute(BGWRITER)
- temp = cursor.fetchone()
- self.data['background_writer_scheduled'] = temp.get('checkpoints_timed', 0)
- self.data['background_writer_requested'] = temp.get('checkpoints_requests', 0)
-
-'''
- def add_replication_stats(self, cursor):
- cursor.execute(REPLICATION)
- temp = cursor.fetchall()
- for row in temp:
- self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
- 'byte_lag',
- int(row.get('byte_lag', 0)))
-'''
+ for metric in metrics:
+ dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric
+ if metric in row:
+ self.data[dimension_id] = int(row[metric])
+ elif 'locks_count' in row:
+ self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0
+
+
+def discover_databases_(cursor, query):
+ cursor.execute(query)
+ result = list()
+ for db in [database[0] for database in cursor]:
+ if db not in result:
+ result.append(db)
+ return result
+
+
+def check_if_superuser_(cursor, query):
+ cursor.execute(query)
+ return cursor.fetchone()[0]
+
+
+def populate_lock_types(databases):
+ result = dict()
+ for database in databases:
+ for lock_type in METRICS['LOCKS']:
+ key = '_'.join([database, lock_type])
+ result[key] = 0
+
+ return result
+
+
+def add_database_lock_chart_(order, definitions, database_name):
+ def create_lines(database):
+ result = list()
+ for lock_type in METRICS['LOCKS']:
+ dimension_id = '_'.join([database, lock_type])
+ result.append([dimension_id, lock_type, 'absolute'])
+ return result
+
+ chart_name = database_name + '_locks'
+ order.insert(-1, chart_name)
+ definitions[chart_name] = {
+ 'options':
+ [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
+ 'lines': create_lines(database_name)
+ }
+
+
+def add_database_stat_chart_(order, definitions, name, database_name):
+ def create_lines(database, lines):
+ result = list()
+ for line in lines:
+ new_line = ['_'.join([database, line[0]])] + line[1:]
+ result.append(new_line)
+ return result
+
+ chart_template = CHARTS[name]
+ chart_name = '_'.join([database_name, name])
+ order.insert(0, chart_name)
+ name, title, units, family, context, chart_type = chart_template['options']
+ definitions[chart_name] = {
+ 'options': [name, title + ': ' + database_name, units, 'db ' + database_name, context, chart_type],
+ 'lines': create_lines(database_name, chart_template['lines'])}
+
+
+#
+# def add_replication_stats(self, cursor):
+# cursor.execute(REPLICATION)
+# temp = cursor.fetchall()
+# for row in temp:
+# self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
+# 'byte_lag',
+# int(row.get('byte_lag', 0)))