146 lines
3.6 KiB
Lua
146 lines
3.6 KiB
Lua
-- SPDX-License-Identifier: GPL-3.0-or-later
|
|
-- Load dependent modules
|
|
if not stats then modules.load('stats') end
|
|
|
|
-- This is leader-only module
|
|
local M = {}
|
|
local ffi = require("ffi")
|
|
local socket = require("cqueues.socket")
|
|
local proto_txt = {
|
|
[socket.SOCK_DGRAM] = 'udp',
|
|
[socket.SOCK_STREAM] = 'tcp'
|
|
}
|
|
|
|
local function make_socket(host, port, stype)
|
|
local s, err, status
|
|
-- timeout before next interval begins (roughly)
|
|
local timeout_sec = (M.interval - 10) / sec
|
|
|
|
s = socket.connect({ host = host, port = port, type = stype })
|
|
s:setmode('bn', 'bn')
|
|
s:settimeout(timeout_sec)
|
|
status, err = pcall(s.connect, s, timeout_sec)
|
|
if status == true and err == nil then
|
|
err = 'connect timeout'
|
|
s:close()
|
|
status = false
|
|
end
|
|
|
|
if not status then
|
|
log_info(ffi.C.LOG_GRP_GRAPHITE, 'connecting: %s@%d %s reason: %s',
|
|
host, port, proto_txt[stype], err)
|
|
return status, err
|
|
end
|
|
return s
|
|
end
|
|
|
|
-- Create connected UDP socket
|
|
local function make_udp(host, port)
|
|
return make_socket(host, port, socket.SOCK_DGRAM)
|
|
end
|
|
|
|
-- Create connected TCP socket
|
|
local function make_tcp(host, port)
|
|
return make_socket(host, port, socket.SOCK_STREAM)
|
|
end
|
|
|
|
-- Send the metrics in a table to multiple Graphite consumers
|
|
local function publish_table(metrics, prefix, now)
|
|
local s
|
|
for i in ipairs(M.cli) do
|
|
local host = M.info[i]
|
|
|
|
if M.cli[i] == -1 then
|
|
if host.tcp then
|
|
s = make_tcp(host.addr, host.port)
|
|
else
|
|
s = make_udp(host.addr, host.port)
|
|
end
|
|
if s then
|
|
M.cli[i] = s
|
|
end
|
|
end
|
|
|
|
if M.cli[i] ~= -1 then
|
|
for key,val in pairs(metrics) do
|
|
local msg = key..' '..val..' '..now..'\n'
|
|
if prefix then
|
|
msg = prefix..'.'..msg
|
|
end
|
|
|
|
local ok, err = pcall(M.cli[i].write, M.cli[i], msg)
|
|
if not ok then
|
|
local tcp = M.cli[i]['connect'] ~= nil
|
|
if tcp and host.seen + 2 * M.interval / 1000 <= now then
|
|
local sock_type = (host.tcp and socket.SOCK_STREAM)
|
|
or socket.SOCK_DGRAM
|
|
log_info(ffi.C.LOG_GRP_GRAPHITE, 'reconnecting: %s@%d %s reason: %s',
|
|
host.addr, host.port, proto_txt[sock_type], err)
|
|
s = make_tcp(host.addr, host.port)
|
|
if s then
|
|
M.cli[i] = s
|
|
host.seen = now
|
|
else
|
|
M.cli[i] = -1
|
|
break
|
|
end
|
|
end
|
|
end
|
|
end -- loop metrics
|
|
end
|
|
end -- loop M.cli
|
|
end
|
|
|
|
function M.init()
|
|
M.ev = nil
|
|
M.cli = {}
|
|
M.info = {}
|
|
M.interval = 5 * sec
|
|
M.prefix = string.format('kresd.%s.%s', hostname(), worker.id)
|
|
return 0
|
|
end
|
|
|
|
function M.deinit()
|
|
if M.ev then event.cancel(M.ev) end
|
|
return 0
|
|
end
|
|
|
|
-- @function Publish results to the Graphite server(s)
|
|
function M.publish()
|
|
local now = os.time()
|
|
-- Publish built-in statistics
|
|
if not M.cli then error("no graphite server configured") end
|
|
publish_table(cache.stats(), M.prefix..'.cache', now)
|
|
publish_table(worker.stats(), M.prefix..'.worker', now)
|
|
-- Publish extended statistics if available
|
|
publish_table(stats.list(), M.prefix, now)
|
|
return 0
|
|
end
|
|
|
|
-- @function Make connection to Graphite server.
|
|
function M.add_server(_, host, port, tcp)
|
|
table.insert(M.cli, -1)
|
|
table.insert(M.info, {addr = host, port = port, tcp = tcp, seen = 0})
|
|
return 0
|
|
end
|
|
|
|
function M.config(conf)
|
|
-- config defaults
|
|
if not conf then return 0 end
|
|
if not conf.port then conf.port = 2003 end
|
|
if conf.interval then M.interval = conf.interval end
|
|
if conf.prefix then M.prefix = conf.prefix end
|
|
if type(conf.host) == 'table' then
|
|
for _, val in pairs(conf.host) do
|
|
M:add_server(val, conf.port, conf.tcp)
|
|
end
|
|
else
|
|
M:add_server(conf.host, conf.port, conf.tcp)
|
|
end
|
|
-- start publishing stats
|
|
if M.ev then event.cancel(M.ev) end
|
|
M.ev = event.recurrent(M.interval, function() worker.coroutine(M.publish) end)
|
|
return 0
|
|
end
|
|
|
|
return M
|