summaryrefslogtreecommitdiffstats
path: root/modules/graphite/graphite.lua
blob: bbf735a20eaee6f0017eb16d3182d229705e75e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
-- Load dependent modules
if not stats then modules.load('stats') end

-- This is leader-only module
if worker.id > 0 then return {} end
local M = {}
local socket = require('socket')

-- Create connected UDP socket
local function make_udp(host, port)
	local s, err, status
	if host:find(':') then
		s, err = socket.udp6()
	else
		s, err = socket.udp()
	end
	if not s then
		return nil, err
	end
	status, err = s:setpeername(host, port)
	if not status then
		return nil, err
	end
	return s
end

-- Create connected TCP socket
local function make_tcp(host, port)
	local s, err, status
	if host:find(':') then
		s, err = socket.tcp6()
	else
		s, err = socket.tcp()
	end
	if not s then
		return nil, err
	end
	status, err = s:connect(host, port)
	if not status then
		return s, err
	end
	return s
end

local function merge(results)
	local t = {}
	for _, result in ipairs(results) do
		for k, v in pairs(result) do
			t[k] = (t[k] or 0) + v
		end
	end
	return t
end

-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
	for key,val in pairs(metrics) do
		local msg = key..' '..val..' '..now..'\n'
		if prefix then
			msg = prefix..'.'..msg
		end
		for i in ipairs(M.cli) do
			local ok, err = M.cli[i]:send(msg)
			if not ok then
				-- Best-effort reconnect once per two tries
				local tcp = M.cli[i]['connect'] ~= nil
				local host = M.info[i]
				if tcp and host.seen + 2 * M.interval / 1000 <= now then
					print(string.format('[graphite] reconnecting: %s@%d reason: %s',
						  host.addr, host.port, err))
					M.cli[i] = make_tcp(host.addr, host.port)
					host.seen = now
				end
			end
		end
	end
end

function M.init()
	M.ev = nil
	M.cli = {}
	M.info = {}
	M.interval = 5 * sec
	M.prefix = 'kresd.' .. hostname()
	return 0
end

function M.deinit()
	if M.ev then event.cancel(M.ev) end
	return 0
end

-- @function Publish results to the Graphite server(s)
function M.publish()
	local now = os.time()
	-- Publish built-in statistics
	if not M.cli then error("no graphite server configured") end
	publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now)
	publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now)
	-- Publish extended statistics if available
	publish_table(merge(map 'stats.list()'), M.prefix, now)
	return 0
end

-- @function Make connection to Graphite server.
function M.add_server(_, host, port, tcp)
	local s, err
	if tcp then
		s, err = make_tcp(host, port)
	else
		s, err = make_udp(host, port)
	end
	if not s then
		error(err)
	end
	table.insert(M.cli, s)
	table.insert(M.info, {addr = host, port = port, seen = 0})
	return 0
end

function M.config(conf)
	-- config defaults
	if not conf then return 0 end
	if not conf.port then conf.port = 2003 end
	if conf.interval then M.interval = conf.interval end
	if conf.prefix then M.prefix = conf.prefix end
	-- connect to host(s)
	if type(conf.host) == 'table' then
		for _, val in pairs(conf.host) do
			M:add_server(val, conf.port, conf.tcp)
		end
	else
		M:add_server(conf.host, conf.port, conf.tcp)
	end
	-- start publishing stats
	if M.ev then event.cancel(M.ev) end
	M.ev = event.recurrent(M.interval, M.publish)
	return 0
end

return M