1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
-- SPDX-License-Identifier: GPL-3.0-or-later
-- Load dependent modules
if not stats then modules.load('stats') end
-- This is leader-only module
local M = {}
local ffi = require("ffi")
local socket = require("cqueues.socket")
local proto_txt = {
[socket.SOCK_DGRAM] = 'udp',
[socket.SOCK_STREAM] = 'tcp'
}
local function make_socket(host, port, stype)
local s, err, status
-- timeout before next interval begins (roughly)
local timeout_sec = (M.interval - 10) / sec
s = socket.connect({ host = host, port = port, type = stype })
s:setmode('bn', 'bn')
s:settimeout(timeout_sec)
status, err = pcall(s.connect, s, timeout_sec)
if status == true and err == nil then
err = 'connect timeout'
s:close()
status = false
end
if not status then
log_info(ffi.C.LOG_GRP_GRAPHITE, 'connecting: %s@%d %s reason: %s',
host, port, proto_txt[stype], err)
return status, err
end
return s
end
-- Create connected UDP socket
local function make_udp(host, port)
return make_socket(host, port, socket.SOCK_DGRAM)
end
-- Create connected TCP socket
local function make_tcp(host, port)
return make_socket(host, port, socket.SOCK_STREAM)
end
-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
local s
for i in ipairs(M.cli) do
local host = M.info[i]
if M.cli[i] == -1 then
if host.tcp then
s = make_tcp(host.addr, host.port)
else
s = make_udp(host.addr, host.port)
end
if s then
M.cli[i] = s
end
end
if M.cli[i] ~= -1 then
for key,val in pairs(metrics) do
local msg = key..' '..val..' '..now..'\n'
if prefix then
msg = prefix..'.'..msg
end
local ok, err = pcall(M.cli[i].write, M.cli[i], msg)
if not ok then
local tcp = M.cli[i]['connect'] ~= nil
if tcp and host.seen + 2 * M.interval / 1000 <= now then
local sock_type = (host.tcp and socket.SOCK_STREAM)
or socket.SOCK_DGRAM
log_info(ffi.C.LOG_GRP_GRAPHITE, 'reconnecting: %s@%d %s reason: %s',
host.addr, host.port, proto_txt[sock_type], err)
s = make_tcp(host.addr, host.port)
if s then
M.cli[i] = s
host.seen = now
else
M.cli[i] = -1
break
end
end
end
end -- loop metrics
end
end -- loop M.cli
end
function M.init()
M.ev = nil
M.cli = {}
M.info = {}
M.interval = 5 * sec
M.prefix = string.format('kresd.%s.%s', hostname(), worker.id)
return 0
end
function M.deinit()
if M.ev then event.cancel(M.ev) end
return 0
end
-- @function Publish results to the Graphite server(s)
function M.publish()
local now = os.time()
-- Publish built-in statistics
if not M.cli then error("no graphite server configured") end
publish_table(cache.stats(), M.prefix..'.cache', now)
publish_table(worker.stats(), M.prefix..'.worker', now)
-- Publish extended statistics if available
publish_table(stats.list(), M.prefix, now)
return 0
end
-- @function Make connection to Graphite server.
function M.add_server(_, host, port, tcp)
table.insert(M.cli, -1)
table.insert(M.info, {addr = host, port = port, tcp = tcp, seen = 0})
return 0
end
function M.config(conf)
-- config defaults
if not conf then return 0 end
if not conf.port then conf.port = 2003 end
if conf.interval then M.interval = conf.interval end
if conf.prefix then M.prefix = conf.prefix end
if type(conf.host) == 'table' then
for _, val in pairs(conf.host) do
M:add_server(val, conf.port, conf.tcp)
end
else
M:add_server(conf.host, conf.port, conf.tcp)
end
-- start publishing stats
if M.ev then event.cancel(M.ev) end
M.ev = event.recurrent(M.interval, function() worker.coroutine(M.publish) end)
return 0
end
return M
|