summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/gearman/gearman.chart.py
blob: 5e280a4d80554ad4b91c3269302c9c11f79c75eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Description: dovecot netdata python.d module
# Author: Kyle Agronick (agronick)
# SPDX-License-Identifier: GPL-3.0+

# Gearman Netdata Plugin

from copy import deepcopy

from bases.FrameworkServices.SocketService import SocketService

CHARTS = {
    'total_workers': {
        'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'],
        'lines': [
            ['total_pending', 'Pending', 'absolute'],
            ['total_running', 'Running', 'absolute'],
        ]
    },
}


def job_chart_template(job_name):
    return {
        'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'],
        'lines': [
            ['{0}_pending'.format(job_name), 'Pending', 'absolute'],
            ['{0}_idle'.format(job_name), 'Idle', 'absolute'],
            ['{0}_running'.format(job_name), 'Running', 'absolute'],
        ]
    }


def build_result_dict(job):
    """
    Get the status for each job
    :return: dict
    """

    total, running, available = job['metrics']

    idle = available - running
    pending = total - running

    return {
        '{0}_pending'.format(job['job_name']): pending,
        '{0}_idle'.format(job['job_name']): idle,
        '{0}_running'.format(job['job_name']): running,
    }


def parse_worker_data(job):
    job_name = job[0]
    job_metrics = job[1:]

    return {
        'job_name': job_name,
        'metrics': job_metrics,
    }


class GearmanReadException(BaseException):
    pass


class Service(SocketService):
    def __init__(self, configuration=None, name=None):
        super(Service, self).__init__(configuration=configuration, name=name)
        self.request = "status\n"
        self._keep_alive = True

        self.host = self.configuration.get('host', 'localhost')
        self.port = self.configuration.get('port', 4730)

        self.tls = self.configuration.get('tls', False)
        self.cert = self.configuration.get('cert', None)
        self.key = self.configuration.get('key', None)

        self.active_jobs = set()
        self.definitions = deepcopy(CHARTS)
        self.order = ['total_workers']

    def _get_data(self):
        """
        Format data received from socket
        :return: dict
        """

        try:
            active_jobs = self.get_active_jobs()
        except GearmanReadException:
            return None

        found_jobs, job_data = self.process_jobs(active_jobs)
        self.remove_stale_jobs(found_jobs)
        return job_data

    def get_active_jobs(self):
        active_jobs = []

        for job in self.get_worker_data():
            parsed_job = parse_worker_data(job)

            # Gearman does not clean up old jobs
            # We only care about jobs that have
            # some relevant data
            if not any(parsed_job['metrics']):
                continue

            active_jobs.append(parsed_job)

        return active_jobs

    def get_worker_data(self):
        """
        Split the data returned from Gearman
        into a list of lists

        This returns the same output that you
        would get from a gearadmin --status
        command.

        Example output returned from
        _get_raw_data():
        prefix generic_worker4 78      78      500
        generic_worker2 78      78      500
        generic_worker3 0       0       760
        generic_worker1 0       0       500

        :return: list
        """

        try:
            raw = self._get_raw_data()
        except (ValueError, AttributeError):
            raise GearmanReadException()

        if raw is None:
            self.debug("Gearman returned no data")
            raise GearmanReadException()

        workers = list()

        for line in raw.splitlines()[:-1]:
            parts = line.split()
            if not parts:
                continue

            name = '_'.join(parts[:-3])
            try:
                values = [int(w) for w in parts[-3:]]
            except ValueError:
                continue

            w = [name]
            w.extend(values)
            workers.append(w)

        return workers

    def process_jobs(self, active_jobs):

        output = {
            'total_pending': 0,
            'total_idle': 0,
            'total_running': 0,
        }
        found_jobs = set()

        for parsed_job in active_jobs:

            job_name = self.add_job(parsed_job)
            found_jobs.add(job_name)
            job_data = build_result_dict(parsed_job)

            for sum_value in ('pending', 'running', 'idle'):
                output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)]

            output.update(job_data)

        return found_jobs, output

    def remove_stale_jobs(self, active_job_list):
        """
        Removes jobs that have no workers, pending jobs,
        or running jobs
        :param active_job_list: The latest list of active jobs
        :type active_job_list: iterable
        :return: None
        """

        for to_remove in self.active_jobs - active_job_list:
            self.remove_job(to_remove)

    def add_job(self, parsed_job):
        """
        Adds a job to the list of active jobs
        :param parsed_job: A parsed job dict
        :type parsed_job: dict
        :return: None
        """

        def add_chart(job_name):
            """
            Adds a new job chart
            :param job_name: The name of the job to add
            :type job_name: string
            :return: None
            """

            job_key = 'job_{0}'.format(job_name)
            template = job_chart_template(job_name)
            new_chart = self.charts.add_chart([job_key] + template['options'])
            for dimension in template['lines']:
                new_chart.add_dimension(dimension)

        if parsed_job['job_name'] not in self.active_jobs:
            add_chart(parsed_job['job_name'])
            self.active_jobs.add(parsed_job['job_name'])

        return parsed_job['job_name']

    def remove_job(self, job_name):
        """
        Removes a job to the list of active jobs
        :param job_name: The name of the job to remove
        :type job_name: string
        :return: None
        """

        def remove_chart(job_name):
            """
            Removes a job chart
            :param job_name: The name of the job to remove
            :type job_name: string
            :return: None
            """

            job_key = 'job_{0}'.format(job_name)
            self.charts[job_key].obsolete()
            del self.charts[job_key]

        remove_chart(job_name)
        self.active_jobs.remove(job_name)