summaryrefslogtreecommitdiffstats
path: root/src/collectors/python.d.plugin/ceph/ceph.chart.py
blob: 4bcbe1979d151da7e1a92e3d938d603b326b47a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# -*- coding: utf-8 -*-
# Description: ceph netdata python.d module
# Author: Luis Eduardo (lets00)
# SPDX-License-Identifier: GPL-3.0-or-later

try:
    import rados

    CEPH = True
except ImportError:
    CEPH = False

import json
import os

from bases.FrameworkServices.SimpleService import SimpleService

# default module values (can be overridden per job in `config`)
update_every = 10

ORDER = [
    'general_usage',
    'general_objects',
    'general_bytes',
    'general_operations',
    'general_latency',
    'pool_usage',
    'pool_objects',
    'pool_read_bytes',
    'pool_write_bytes',
    'pool_read_operations',
    'pool_write_operations',
    'osd_usage',
    'osd_size',
    'osd_apply_latency',
    'osd_commit_latency'
]

CHARTS = {
    'general_usage': {
        'options': [None, 'Ceph General Space', 'KiB', 'general', 'ceph.general_usage', 'stacked'],
        'lines': [
            ['general_available', 'avail', 'absolute'],
            ['general_usage', 'used', 'absolute']
        ]
    },
    'general_objects': {
        'options': [None, 'Ceph General Objects', 'objects', 'general', 'ceph.general_objects', 'area'],
        'lines': [
            ['general_objects', 'cluster', 'absolute']
        ]
    },
    'general_bytes': {
        'options': [None, 'Ceph General Read/Write Data/s', 'KiB/s', 'general', 'ceph.general_bytes',
                    'area'],
        'lines': [
            ['general_read_bytes', 'read', 'absolute', 1, 1024],
            ['general_write_bytes', 'write', 'absolute', -1, 1024]
        ]
    },
    'general_operations': {
        'options': [None, 'Ceph General Read/Write Operations/s', 'operations', 'general', 'ceph.general_operations',
                    'area'],
        'lines': [
            ['general_read_operations', 'read', 'absolute', 1],
            ['general_write_operations', 'write', 'absolute', -1]
        ]
    },
    'general_latency': {
        'options': [None, 'Ceph General Apply/Commit latency', 'milliseconds', 'general', 'ceph.general_latency',
                    'area'],
        'lines': [
            ['general_apply_latency', 'apply', 'absolute'],
            ['general_commit_latency', 'commit', 'absolute']
        ]
    },
    'pool_usage': {
        'options': [None, 'Ceph Pools', 'KiB', 'pool', 'ceph.pool_usage', 'line'],
        'lines': []
    },
    'pool_objects': {
        'options': [None, 'Ceph Pools', 'objects', 'pool', 'ceph.pool_objects', 'line'],
        'lines': []
    },
    'pool_read_bytes': {
        'options': [None, 'Ceph Read Pool Data/s', 'KiB/s', 'pool', 'ceph.pool_read_bytes', 'area'],
        'lines': []
    },
    'pool_write_bytes': {
        'options': [None, 'Ceph Write Pool Data/s', 'KiB/s', 'pool', 'ceph.pool_write_bytes', 'area'],
        'lines': []
    },
    'pool_read_operations': {
        'options': [None, 'Ceph Read Pool Operations/s', 'operations', 'pool', 'ceph.pool_read_operations', 'area'],
        'lines': []
    },
    'pool_write_operations': {
        'options': [None, 'Ceph Write Pool Operations/s', 'operations', 'pool', 'ceph.pool_write_operations', 'area'],
        'lines': []
    },
    'osd_usage': {
        'options': [None, 'Ceph OSDs', 'KiB', 'osd', 'ceph.osd_usage', 'line'],
        'lines': []
    },
    'osd_size': {
        'options': [None, 'Ceph OSDs size', 'KiB', 'osd', 'ceph.osd_size', 'line'],
        'lines': []
    },
    'osd_apply_latency': {
        'options': [None, 'Ceph OSDs apply latency', 'milliseconds', 'osd', 'ceph.apply_latency', 'line'],
        'lines': []
    },
    'osd_commit_latency': {
        'options': [None, 'Ceph OSDs commit latency', 'milliseconds', 'osd', 'ceph.commit_latency', 'line'],
        'lines': []
    }

}


class Service(SimpleService):
    def __init__(self, configuration=None, name=None):
        SimpleService.__init__(self, configuration=configuration, name=name)
        self.order = ORDER
        self.definitions = CHARTS
        self.config_file = self.configuration.get('config_file')
        self.keyring_file = self.configuration.get('keyring_file')
        self.rados_id = self.configuration.get('rados_id', 'admin')

    def check(self):
        """
        Checks module
        :return:
        """
        if not CEPH:
            self.error('rados module is needed to use ceph.chart.py')
            return False
        if not (self.config_file and self.keyring_file):
            self.error('config_file and/or keyring_file is not defined')
            return False

        # Verify files and permissions
        if not (os.access(self.config_file, os.F_OK)):
            self.error('{0} does not exist'.format(self.config_file))
            return False
        if not (os.access(self.keyring_file, os.F_OK)):
            self.error('{0} does not exist'.format(self.keyring_file))
            return False
        if not (os.access(self.config_file, os.R_OK)):
            self.error('Ceph plugin does not read {0}, define read permission.'.format(self.config_file))
            return False
        if not (os.access(self.keyring_file, os.R_OK)):
            self.error('Ceph plugin does not read {0}, define read permission.'.format(self.keyring_file))
            return False
        try:
            self.cluster = rados.Rados(conffile=self.config_file,
                                       conf=dict(keyring=self.keyring_file),
                                       rados_id=self.rados_id)
            self.cluster.connect()
        except rados.Error as error:
            self.error(error)
            return False
        self.create_definitions()
        return True

    def create_definitions(self):
        """
        Create dynamically charts options
        :return: None
        """
        # Pool lines
        for pool in sorted(self._get_df()['pools'], key=lambda x: sorted(x.keys())):
            self.definitions['pool_usage']['lines'].append([pool['name'],
                                                            pool['name'],
                                                            'absolute'])
            self.definitions['pool_objects']['lines'].append(["obj_{0}".format(pool['name']),
                                                              pool['name'],
                                                              'absolute'])
            self.definitions['pool_read_bytes']['lines'].append(['read_{0}'.format(pool['name']),
                                                                 pool['name'],
                                                                 'absolute', 1, 1024])
            self.definitions['pool_write_bytes']['lines'].append(['write_{0}'.format(pool['name']),
                                                                  pool['name'],
                                                                  'absolute', 1, 1024])
            self.definitions['pool_read_operations']['lines'].append(['read_operations_{0}'.format(pool['name']),
                                                                      pool['name'],
                                                                      'absolute'])
            self.definitions['pool_write_operations']['lines'].append(['write_operations_{0}'.format(pool['name']),
                                                                       pool['name'],
                                                                       'absolute'])

        # OSD lines
        for osd in sorted(self._get_osd_df()['nodes'], key=lambda x: sorted(x.keys())):
            self.definitions['osd_usage']['lines'].append([osd['name'],
                                                           osd['name'],
                                                           'absolute'])
            self.definitions['osd_size']['lines'].append(['size_{0}'.format(osd['name']),
                                                           osd['name'],
                                                           'absolute'])
            self.definitions['osd_apply_latency']['lines'].append(['apply_latency_{0}'.format(osd['name']),
                                                                   osd['name'],
                                                                   'absolute'])
            self.definitions['osd_commit_latency']['lines'].append(['commit_latency_{0}'.format(osd['name']),
                                                                    osd['name'],
                                                                    'absolute'])

    def get_data(self):
        """
        Catch all ceph data
        :return: dict
        """
        try:
            data = {}
            df = self._get_df()
            osd_df = self._get_osd_df()
            osd_perf = self._get_osd_perf()
            osd_perf_infos = get_osd_perf_infos(osd_perf)
            pool_stats = self._get_osd_pool_stats()

            data.update(self._get_general(osd_perf_infos, pool_stats))
            for pool in df['pools']:
                data.update(self._get_pool_usage(pool))
                data.update(self._get_pool_objects(pool))
            for pool_io in pool_stats:
                data.update(self._get_pool_rw(pool_io))
            for osd in osd_df['nodes']:
                data.update(self._get_osd_usage(osd))
                data.update(self._get_osd_size(osd))
            for osd_apply_commit in osd_perf_infos:
                data.update(self._get_osd_latency(osd_apply_commit))
            return data
        except (ValueError, AttributeError) as error:
            self.error(error)
            return None

    def _get_general(self, osd_perf_infos, pool_stats):
        """
        Get ceph's general usage
        :return: dict
        """
        status = self.cluster.get_cluster_stats()
        read_bytes_sec = 0
        write_bytes_sec = 0
        read_op_per_sec = 0
        write_op_per_sec = 0
        apply_latency = 0
        commit_latency = 0

        for pool_rw_io_b in pool_stats:
            read_bytes_sec += pool_rw_io_b['client_io_rate'].get('read_bytes_sec', 0)
            write_bytes_sec += pool_rw_io_b['client_io_rate'].get('write_bytes_sec', 0)
            read_op_per_sec += pool_rw_io_b['client_io_rate'].get('read_op_per_sec', 0)
            write_op_per_sec += pool_rw_io_b['client_io_rate'].get('write_op_per_sec', 0)
        for perf in osd_perf_infos:
            apply_latency += perf['perf_stats']['apply_latency_ms']
            commit_latency += perf['perf_stats']['commit_latency_ms']

        return {
            'general_usage': int(status['kb_used']),
            'general_available': int(status['kb_avail']),
            'general_objects': int(status['num_objects']),
            'general_read_bytes': read_bytes_sec,
            'general_write_bytes': write_bytes_sec,
            'general_read_operations': read_op_per_sec,
            'general_write_operations': write_op_per_sec,
            'general_apply_latency': apply_latency,
            'general_commit_latency': commit_latency
        }

    @staticmethod
    def _get_pool_usage(pool):
        """
        Process raw data into pool usage dict information
        :return: A pool dict with pool name's key and usage bytes' value
        """
        return {pool['name']: pool['stats']['kb_used']}

    @staticmethod
    def _get_pool_objects(pool):
        """
        Process raw data into pool usage dict information
        :return: A pool dict with pool name's key and object numbers
        """
        return {'obj_{0}'.format(pool['name']): pool['stats']['objects']}

    @staticmethod
    def _get_pool_rw(pool):
        """
        Get read/write kb and operations in a pool
        :return: A pool dict with both read/write bytes and operations.
        """
        return {
            'read_{0}'.format(pool['pool_name']): int(pool['client_io_rate'].get('read_bytes_sec', 0)),
            'write_{0}'.format(pool['pool_name']): int(pool['client_io_rate'].get('write_bytes_sec', 0)),
            'read_operations_{0}'.format(pool['pool_name']): int(pool['client_io_rate'].get('read_op_per_sec', 0)),
            'write_operations_{0}'.format(pool['pool_name']): int(pool['client_io_rate'].get('write_op_per_sec', 0))
        }

    @staticmethod
    def _get_osd_usage(osd):
        """
        Process raw data into osd dict information to get osd usage
        :return: A osd dict with osd name's key and usage bytes' value
        """
        return {osd['name']: float(osd['kb_used'])}

    @staticmethod
    def _get_osd_size(osd):
        """
        Process raw data into osd dict information to get osd size (kb)
        :return: A osd dict with osd name's key and size bytes' value
        """
        return {'size_{0}'.format(osd['name']): float(osd['kb'])}

    @staticmethod
    def _get_osd_latency(osd):
        """
        Get ceph osd apply and commit latency
        :return: A osd dict with osd name's key with both apply and commit latency values
        """
        return {
            'apply_latency_osd.{0}'.format(osd['id']): osd['perf_stats']['apply_latency_ms'],
            'commit_latency_osd.{0}'.format(osd['id']): osd['perf_stats']['commit_latency_ms']
        }

    def _get_df(self):
        """
        Get ceph df output
        :return: ceph df --format json
        """
        return json.loads(self.cluster.mon_command(json.dumps({
            'prefix': 'df',
            'format': 'json'
        }), b'')[1].decode('utf-8'))

    def _get_osd_df(self):
        """
        Get ceph osd df output
        :return: ceph osd df --format json
        """
        return json.loads(self.cluster.mon_command(json.dumps({
            'prefix': 'osd df',
            'format': 'json'
        }), b'')[1].decode('utf-8').replace('-nan', '"-nan"'))

    def _get_osd_perf(self):
        """
        Get ceph osd performance
        :return: ceph osd perf --format json
        """
        return json.loads(self.cluster.mon_command(json.dumps({
            'prefix': 'osd perf',
            'format': 'json'
        }), b'')[1].decode('utf-8'))

    def _get_osd_pool_stats(self):
        """
        Get ceph osd pool status.
        This command is used to get information about both
        read/write operation and bytes per second on each pool
        :return: ceph osd pool stats --format json
        """
        return json.loads(self.cluster.mon_command(json.dumps({
            'prefix': 'osd pool stats',
            'format': 'json'
        }), b'')[1].decode('utf-8'))


def get_osd_perf_infos(osd_perf):
    # https://github.com/netdata/netdata/issues/8247
    # module uses 'osd_perf_infos' data, its been moved under 'osdstats` since Ceph v14.2
    if 'osd_perf_infos' in osd_perf:
        return osd_perf['osd_perf_infos']
    return osd_perf['osdstats']['osd_perf_infos']