summaryrefslogtreecommitdiffstats
path: root/modules/graphite/graphite.lua
blob: be2d7b2cc6d4182e5a40288667327d8eeedea185 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
-- SPDX-License-Identifier: GPL-3.0-or-later
-- Load dependent modules
if not stats then modules.load('stats') end

-- This is leader-only module
local M = {}
local ffi = require("ffi")
local socket = require("cqueues.socket")
local proto_txt = {
	[socket.SOCK_DGRAM] = 'udp',
	[socket.SOCK_STREAM] = 'tcp'
}

local function make_socket(host, port, stype)
	local s, err, status
	-- timeout before next interval begins (roughly)
	local timeout_sec = (M.interval - 10) / sec

	s = socket.connect({ host = host, port = port, type = stype })
	s:setmode('bn', 'bn')
	s:settimeout(timeout_sec)
	status, err = pcall(s.connect, s, timeout_sec)
	if status == true and err == nil then
		err = 'connect timeout'
		s:close()
		status = false
	end

	if not status then
		log_info(ffi.C.LOG_GRP_GRAPHITE, 'connecting: %s@%d %s reason: %s',
			host, port, proto_txt[stype], err)
		return status, err
	end
	return s
end

-- Create connected UDP socket
local function make_udp(host, port)
	return make_socket(host, port, socket.SOCK_DGRAM)
end

-- Create connected TCP socket
local function make_tcp(host, port)
	return make_socket(host, port, socket.SOCK_STREAM)
end

-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
	local s
	for i in ipairs(M.cli) do
		local host = M.info[i]

		if M.cli[i] == -1 then
			if host.tcp then
				s = make_tcp(host.addr, host.port)
			else
				s = make_udp(host.addr, host.port)
			end
			if s then
				M.cli[i] = s
			end
		end

		if M.cli[i] ~= -1 then
			for key,val in pairs(metrics) do
				local msg = key..' '..val..' '..now..'\n'
				if prefix then
					msg = prefix..'.'..msg
				end

				local ok, err = pcall(M.cli[i].write, M.cli[i], msg)
				if not ok then
					local tcp = M.cli[i]['connect'] ~= nil
					if tcp and host.seen + 2 * M.interval / 1000 <= now then
						local sock_type = (host.tcp and socket.SOCK_STREAM)
									or socket.SOCK_DGRAM
						log_info(ffi.C.LOG_GRP_GRAPHITE, 'reconnecting: %s@%d %s reason: %s',
							  host.addr, host.port, proto_txt[sock_type], err)
						s = make_tcp(host.addr, host.port)
						if s then
							M.cli[i] = s
							host.seen = now
						else
							M.cli[i] = -1
							break
						end
					end
				end
			end -- loop metrics
		end
	end -- loop M.cli
end

function M.init()
	M.ev = nil
	M.cli = {}
	M.info = {}
	M.interval = 5 * sec
	M.prefix = string.format('kresd.%s.%s', hostname(), worker.id)
	return 0
end

function M.deinit()
	if M.ev then event.cancel(M.ev) end
	return 0
end

-- @function Publish results to the Graphite server(s)
function M.publish()
	local now = os.time()
	-- Publish built-in statistics
	if not M.cli then error("no graphite server configured") end
	publish_table(cache.stats(), M.prefix..'.cache', now)
	publish_table(worker.stats(), M.prefix..'.worker', now)
	-- Publish extended statistics if available
	publish_table(stats.list(), M.prefix, now)
	return 0
end

-- @function Make connection to Graphite server.
function M.add_server(_, host, port, tcp)
	table.insert(M.cli, -1)
	table.insert(M.info, {addr = host, port = port, tcp = tcp, seen = 0})
	return 0
end

function M.config(conf)
	-- config defaults
	if not conf then return 0 end
	if not conf.port then conf.port = 2003 end
	if conf.interval then M.interval = conf.interval end
	if conf.prefix then M.prefix = conf.prefix end
	if type(conf.host) == 'table' then
		for _, val in pairs(conf.host) do
			M:add_server(val, conf.port, conf.tcp)
		end
	else
		M:add_server(conf.host, conf.port, conf.tcp)
	end
	-- start publishing stats
	if M.ev then event.cancel(M.ev) end
	M.ev = event.recurrent(M.interval, function() worker.coroutine(M.publish) end)
	return 0
end

return M