1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
# Description: dovecot netdata python.d module
# Author: Kyle Agronick (agronick)
# SPDX-License-Identifier: GPL-3.0+
# Gearman Netdata Plugin
from copy import deepcopy
from bases.FrameworkServices.SocketService import SocketService
CHARTS = {
'total_workers': {
'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'],
'lines': [
['total_pending', 'Pending', 'absolute'],
['total_running', 'Running', 'absolute'],
]
},
}
def job_chart_template(job_name):
return {
'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'],
'lines': [
['{0}_pending'.format(job_name), 'Pending', 'absolute'],
['{0}_idle'.format(job_name), 'Idle', 'absolute'],
['{0}_running'.format(job_name), 'Running', 'absolute'],
]
}
def build_result_dict(job):
"""
Get the status for each job
:return: dict
"""
total, running, available = job['metrics']
idle = available - running
pending = total - running
return {
'{0}_pending'.format(job['job_name']): pending,
'{0}_idle'.format(job['job_name']): idle,
'{0}_running'.format(job['job_name']): running,
}
def parse_worker_data(job):
job_name = job[0]
job_metrics = job[1:]
return {
'job_name': job_name,
'metrics': job_metrics,
}
class GearmanReadException(BaseException):
pass
class Service(SocketService):
def __init__(self, configuration=None, name=None):
super(Service, self).__init__(configuration=configuration, name=name)
self.request = "status\n"
self._keep_alive = True
self.host = self.configuration.get('host', 'localhost')
self.port = self.configuration.get('port', 4730)
self.tls = self.configuration.get('tls', False)
self.cert = self.configuration.get('cert', None)
self.key = self.configuration.get('key', None)
self.active_jobs = set()
self.definitions = deepcopy(CHARTS)
self.order = ['total_workers']
def _get_data(self):
"""
Format data received from socket
:return: dict
"""
try:
active_jobs = self.get_active_jobs()
except GearmanReadException:
return None
found_jobs, job_data = self.process_jobs(active_jobs)
self.remove_stale_jobs(found_jobs)
return job_data
def get_active_jobs(self):
active_jobs = []
for job in self.get_worker_data():
parsed_job = parse_worker_data(job)
# Gearman does not clean up old jobs
# We only care about jobs that have
# some relevant data
if not any(parsed_job['metrics']):
continue
active_jobs.append(parsed_job)
return active_jobs
def get_worker_data(self):
"""
Split the data returned from Gearman
into a list of lists
This returns the same output that you
would get from a gearadmin --status
command.
Example output returned from
_get_raw_data():
prefix generic_worker4 78 78 500
generic_worker2 78 78 500
generic_worker3 0 0 760
generic_worker1 0 0 500
:return: list
"""
try:
raw = self._get_raw_data()
except (ValueError, AttributeError):
raise GearmanReadException()
if raw is None:
self.debug("Gearman returned no data")
raise GearmanReadException()
workers = list()
for line in raw.splitlines()[:-1]:
parts = line.split()
if not parts:
continue
name = '_'.join(parts[:-3])
try:
values = [int(w) for w in parts[-3:]]
except ValueError:
continue
w = [name]
w.extend(values)
workers.append(w)
return workers
def process_jobs(self, active_jobs):
output = {
'total_pending': 0,
'total_idle': 0,
'total_running': 0,
}
found_jobs = set()
for parsed_job in active_jobs:
job_name = self.add_job(parsed_job)
found_jobs.add(job_name)
job_data = build_result_dict(parsed_job)
for sum_value in ('pending', 'running', 'idle'):
output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)]
output.update(job_data)
return found_jobs, output
def remove_stale_jobs(self, active_job_list):
"""
Removes jobs that have no workers, pending jobs,
or running jobs
:param active_job_list: The latest list of active jobs
:type active_job_list: iterable
:return: None
"""
for to_remove in self.active_jobs - active_job_list:
self.remove_job(to_remove)
def add_job(self, parsed_job):
"""
Adds a job to the list of active jobs
:param parsed_job: A parsed job dict
:type parsed_job: dict
:return: None
"""
def add_chart(job_name):
"""
Adds a new job chart
:param job_name: The name of the job to add
:type job_name: string
:return: None
"""
job_key = 'job_{0}'.format(job_name)
template = job_chart_template(job_name)
new_chart = self.charts.add_chart([job_key] + template['options'])
for dimension in template['lines']:
new_chart.add_dimension(dimension)
if parsed_job['job_name'] not in self.active_jobs:
add_chart(parsed_job['job_name'])
self.active_jobs.add(parsed_job['job_name'])
return parsed_job['job_name']
def remove_job(self, job_name):
"""
Removes a job to the list of active jobs
:param job_name: The name of the job to remove
:type job_name: string
:return: None
"""
def remove_chart(job_name):
"""
Removes a job chart
:param job_name: The name of the job to remove
:type job_name: string
:return: None
"""
job_key = 'job_{0}'.format(job_name)
self.charts[job_key].obsolete()
del self.charts[job_key]
remove_chart(job_name)
self.active_jobs.remove(job_name)
|