1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
-- Load dependent modules
if not stats then modules.load('stats') end
-- This is leader-only module
if worker.id > 0 then return {} end
local M = {}
local socket = require('socket')
-- Create connected UDP socket
local function make_udp(host, port)
local s, err, status
if host:find(':') then
s, err = socket.udp6()
else
s, err = socket.udp()
end
if not s then
return nil, err
end
status, err = s:setpeername(host, port)
if not status then
return nil, err
end
return s
end
-- Create connected TCP socket
local function make_tcp(host, port)
local s, err, status
if host:find(':') then
s, err = socket.tcp6()
else
s, err = socket.tcp()
end
if not s then
return nil, err
end
status, err = s:connect(host, port)
if not status then
return s, err
end
return s
end
local function merge(results)
local t = {}
for _, result in ipairs(results) do
for k, v in pairs(result) do
t[k] = (t[k] or 0) + v
end
end
return t
end
-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
for key,val in pairs(metrics) do
local msg = key..' '..val..' '..now..'\n'
if prefix then
msg = prefix..'.'..msg
end
for i in ipairs(M.cli) do
local ok, err = M.cli[i]:send(msg)
if not ok then
-- Best-effort reconnect once per two tries
local tcp = M.cli[i]['connect'] ~= nil
local host = M.info[i]
if tcp and host.seen + 2 * M.interval / 1000 <= now then
print(string.format('[graphite] reconnecting: %s@%d reason: %s',
host.addr, host.port, err))
M.cli[i] = make_tcp(host.addr, host.port)
host.seen = now
end
end
end
end
end
function M.init()
M.ev = nil
M.cli = {}
M.info = {}
M.interval = 5 * sec
M.prefix = 'kresd.' .. hostname()
return 0
end
function M.deinit()
if M.ev then event.cancel(M.ev) end
return 0
end
-- @function Publish results to the Graphite server(s)
function M.publish()
local now = os.time()
-- Publish built-in statistics
if not M.cli then error("no graphite server configured") end
publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now)
publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now)
-- Publish extended statistics if available
publish_table(merge(map 'stats.list()'), M.prefix, now)
return 0
end
-- @function Make connection to Graphite server.
function M.add_server(_, host, port, tcp)
local s, err
if tcp then
s, err = make_tcp(host, port)
else
s, err = make_udp(host, port)
end
if not s then
error(err)
end
table.insert(M.cli, s)
table.insert(M.info, {addr = host, port = port, seen = 0})
return 0
end
function M.config(conf)
-- config defaults
if not conf then return 0 end
if not conf.port then conf.port = 2003 end
if conf.interval then M.interval = conf.interval end
if conf.prefix then M.prefix = conf.prefix end
-- connect to host(s)
if type(conf.host) == 'table' then
for _, val in pairs(conf.host) do
M:add_server(val, conf.port, conf.tcp)
end
else
M:add_server(conf.host, conf.port, conf.tcp)
end
-- start publishing stats
if M.ev then event.cancel(M.ev) end
M.ev = event.recurrent(M.interval, M.publish)
return 0
end
return M
|