833 lines
25 KiB
Lua
833 lines
25 KiB
Lua
-- SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
local debug = require('debug')
|
|
local ffi = require('ffi')
|
|
local kluautil = require('kluautil')
|
|
local krprint = require("krprint")
|
|
|
|
-- Units
|
|
kB = 1024
|
|
MB = 1024*kB
|
|
GB = 1024*MB
|
|
-- Time
|
|
sec = 1000
|
|
second = sec
|
|
minute = 60 * sec
|
|
min = minute
|
|
hour = 60 * minute
|
|
day = 24 * hour
|
|
|
|
-- Logging
|
|
|
|
-- from syslog.h
|
|
LOG_CRIT = 2
|
|
LOG_ERR = 3
|
|
LOG_WARNING = 4
|
|
LOG_NOTICE = 5
|
|
LOG_INFO = 6
|
|
LOG_DEBUG = 7
|
|
|
|
local function curr_file() return debug.getinfo(4,'S').source end
|
|
local function curr_line() return debug.getinfo(4,'l').currentline end
|
|
|
|
local function log_fmt(grp, level, fmt, ...)
|
|
ffi.C.kr_log_fmt(grp, level,
|
|
'CODE_FILE='..curr_file(), 'CODE_LINE='..curr_line(), 'CODE_FUNC=',
|
|
'[%-6s] %s\n', ffi.C.kr_log_grp2name(grp), string.format(fmt, ...))
|
|
end
|
|
|
|
function log_req(req, qry_uid, indent, grp, fmt, ...)
|
|
ffi.C.kr_log_req1(req, qry_uid, indent, grp, ffi.C.kr_log_grp2name(grp),
|
|
'%s\n', string.format(fmt, ...))
|
|
end
|
|
|
|
function log_qry(qry, grp, fmt, ...)
|
|
ffi.C.kr_log_q1(qry, grp, ffi.C.kr_log_grp2name(grp),
|
|
'%s\n', string.format(fmt, ...))
|
|
end
|
|
|
|
function panic(fmt, ...)
|
|
print(debug.traceback('error occurred here (config filename:lineno is '
|
|
.. 'at the bottom, if config is involved):', 2))
|
|
error(string.format('ERROR: '.. fmt, ...), 0)
|
|
end
|
|
|
|
function log_error(grp, fmt, ...)
|
|
log_fmt(grp, LOG_ERR, fmt, ...)
|
|
end
|
|
|
|
function log_warn(grp, fmt, ...)
|
|
log_fmt(grp, LOG_WARNING, fmt, ...)
|
|
end
|
|
|
|
function log_notice(grp, fmt, ...)
|
|
log_fmt(grp, LOG_NOTICE, fmt, ...)
|
|
end
|
|
|
|
function log_info(grp, fmt, ...)
|
|
log_fmt(grp, LOG_INFO, fmt, ...)
|
|
end
|
|
|
|
function log_debug(grp, fmt, ...)
|
|
log_fmt(grp, LOG_DEBUG, fmt, ...)
|
|
end
|
|
|
|
function log(fmt, ...)
|
|
log_notice(ffi.C.LOG_GRP_MODULE, fmt, ...)
|
|
end
|
|
|
|
-- Resolver bindings
|
|
kres = require('kres')
|
|
if rawget(kres, 'str2dname') ~= nil then
|
|
todname = kres.str2dname
|
|
end
|
|
|
|
worker.resolve_pkt = function (pkt, options, finish, init)
|
|
options = kres.mk_qflags(options)
|
|
local task = ffi.C.worker_resolve_start(pkt, options)
|
|
|
|
-- Deal with finish and init callbacks
|
|
if finish ~= nil then
|
|
local finish_cb
|
|
finish_cb = ffi.cast('trace_callback_f',
|
|
function (req)
|
|
jit.off(true, true) -- JIT for (C -> lua)^2 nesting isn't allowed
|
|
finish(req.answer, req)
|
|
finish_cb:free()
|
|
end)
|
|
task.ctx.req.trace_finish = finish_cb
|
|
end
|
|
if init ~= nil then
|
|
init(task.ctx.req)
|
|
end
|
|
|
|
return ffi.C.worker_resolve_exec(task, pkt) == 0
|
|
end
|
|
|
|
worker.resolve = function (qname, qtype, qclass, options, finish, init)
|
|
-- Alternatively use named arguments
|
|
if type(qname) == 'table' then
|
|
local t = qname
|
|
qname = t.name
|
|
qtype = t.type
|
|
qclass = t.class
|
|
options = t.options
|
|
finish = t.finish
|
|
init = t.init
|
|
end
|
|
qtype = qtype or kres.type.A
|
|
qclass = qclass or kres.class.IN
|
|
options = kres.mk_qflags(options)
|
|
-- LATER: nicer errors for rubbish in qname, qtype, qclass?
|
|
local pkt = ffi.C.worker_resolve_mk_pkt(qname, qtype, qclass, options)
|
|
if pkt == nil then
|
|
panic('failure in worker.resolve(); probably invalid qname "%s"', qname)
|
|
end
|
|
local ret = worker.resolve_pkt(pkt, options, finish, init)
|
|
ffi.C.knot_pkt_free(pkt);
|
|
return ret
|
|
end
|
|
resolve = worker.resolve
|
|
|
|
-- Shorthand for aggregated per-worker information
|
|
worker.info = function ()
|
|
local t = worker.stats()
|
|
t.pid = worker.pid
|
|
return t
|
|
end
|
|
|
|
-- Resolver mode of operation
|
|
local current_mode = 'normal'
|
|
local mode_table = { normal=0, strict=1, permissive=2 }
|
|
function mode(m)
|
|
if not m then return current_mode end
|
|
if not mode_table[m] then error('unsupported mode: '..m) end
|
|
-- Update current operation mode
|
|
current_mode = m
|
|
option('STRICT', current_mode == 'strict')
|
|
option('PERMISSIVE', current_mode == 'permissive')
|
|
return true
|
|
end
|
|
|
|
-- Trivial option alias
|
|
function reorder_RR(val)
|
|
return option('REORDER_RR', val)
|
|
end
|
|
|
|
-- Get/set resolver options via name (string)
|
|
function option(name, val)
|
|
local flags = kres.context().options;
|
|
-- Note: no way to test existence of flags[name] but we want error anyway.
|
|
name = string.upper(name) -- convenience
|
|
if val ~= nil then
|
|
if (val ~= true) and (val ~= false) then
|
|
panic('invalid option value: ' .. tostring(val))
|
|
end
|
|
flags[name] = val;
|
|
end
|
|
return flags[name];
|
|
end
|
|
|
|
-- Function aliases
|
|
-- `env.VAR returns os.getenv(VAR)`
|
|
env = {}
|
|
setmetatable(env, {
|
|
__index = function (_, k) return os.getenv(k) end
|
|
})
|
|
|
|
debugging = {}
|
|
setmetatable(debugging, {
|
|
__index = function(_, k)
|
|
if k == 'assertion_abort' then return ffi.C.kr_dbg_assertion_abort
|
|
elseif k == 'assertion_fork' then return ffi.C.kr_dbg_assertion_fork
|
|
else panic('invalid debugging option: ' .. tostring(k))
|
|
end
|
|
end,
|
|
__newindex = function(_, k, v)
|
|
if k == 'assertion_abort' then ffi.C.kr_dbg_assertion_abort = v
|
|
elseif k == 'assertion_fork' then ffi.C.kr_dbg_assertion_fork = v
|
|
else panic('invalid debugging option: ' .. tostring(k))
|
|
end
|
|
end
|
|
})
|
|
|
|
-- Quick access to interfaces
|
|
-- `net.<iface>` => `net.interfaces()[iface]`
|
|
-- `net = {addr1, ..}` => `net.listen(name, addr1)`
|
|
-- `net.ipv{4,6} = {true, false}` => enable/disable IPv{4,6}
|
|
setmetatable(net, {
|
|
__index = function (t, k)
|
|
local v = rawget(t, k)
|
|
if v then return v
|
|
elseif k == 'ipv6' then return not option('NO_IPV6')
|
|
elseif k == 'ipv4' then return not option('NO_IPV4')
|
|
else return net.interfaces()[k]
|
|
end
|
|
end,
|
|
__newindex = function (t,k,v)
|
|
if k == 'ipv6' then return option('NO_IPV6', not v)
|
|
elseif k == 'ipv4' then return option('NO_IPV4', not v)
|
|
else
|
|
local iname = rawget(net.interfaces(), v)
|
|
if iname then t.listen(iname)
|
|
else t.listen(v)
|
|
end
|
|
end
|
|
end
|
|
})
|
|
|
|
-- Syntactic sugar for module loading
|
|
-- `modules.<name> = <config>`
|
|
setmetatable(modules, {
|
|
__newindex = function (_, k, v)
|
|
if type(k) == 'number' then
|
|
k, v = v, nil
|
|
end
|
|
if not rawget(_G, k) then
|
|
modules.load(k)
|
|
k = string.match(k, '[%w_]+')
|
|
local mod = _G[k]
|
|
local config = mod and rawget(mod, 'config')
|
|
if mod ~= nil and config ~= nil then
|
|
if k ~= v then config(v)
|
|
else config()
|
|
end
|
|
end
|
|
end
|
|
end
|
|
})
|
|
|
|
-- Set up lua table for a C module. (Internal function.)
|
|
function modules_create_table_for_c(kr_module_ud)
|
|
local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
|
|
--- Set up the global table named according to the module.
|
|
if kr_module.config == nil and kr_module.props == nil then
|
|
return
|
|
end
|
|
local module = {}
|
|
local module_name = ffi.string(kr_module.name)
|
|
_G[module_name] = module
|
|
|
|
--- Construct lua functions for properties.
|
|
if kr_module.props ~= nil then
|
|
local i = 0
|
|
while true do
|
|
local prop = kr_module.props[i]
|
|
local cb = prop.cb
|
|
if cb == nil then break; end
|
|
module[ffi.string(prop.name)] =
|
|
function (arg) -- lua wrapper around kr_prop_cb function typedef
|
|
local arg_conv
|
|
if type(arg) == 'table' or type(arg) == 'boolean' then
|
|
arg_conv = tojson(arg)
|
|
elseif arg ~= nil then
|
|
arg_conv = tostring(arg)
|
|
end
|
|
local ret_cstr = cb(ffi.C.the_worker.engine, kr_module, arg_conv)
|
|
if ret_cstr == nil then
|
|
return nil
|
|
end
|
|
-- LATER(optim.): superfluous copying
|
|
local ret_str = ffi.string(ret_cstr)
|
|
-- This is a bit ugly, but the API is that invalid JSON
|
|
-- should be just returned as string :-(
|
|
local status, ret = pcall(fromjson, ret_str)
|
|
if not status then ret = ret_str end
|
|
ffi.C.free(ret_cstr)
|
|
return ret
|
|
end
|
|
i = i + 1
|
|
end
|
|
end
|
|
|
|
--- Construct lua function for config().
|
|
if kr_module.config ~= nil then
|
|
module.config =
|
|
function (arg)
|
|
local arg_conv
|
|
if type(arg) == 'table' or type(arg) == 'boolean' then
|
|
arg_conv = tojson(arg)
|
|
elseif arg ~= nil then
|
|
arg_conv = tostring(arg)
|
|
end
|
|
return kr_module.config(kr_module, arg_conv)
|
|
end
|
|
end
|
|
|
|
--- Add syntactic sugar for get() and set() properties.
|
|
--- That also "catches" any commands like `moduleName.foo = bar`.
|
|
local m_index, m_newindex
|
|
local get_f = rawget(module, 'get')
|
|
if get_f ~= nil then
|
|
m_index = function (_, key)
|
|
return get_f(key)
|
|
end
|
|
else
|
|
m_index = function ()
|
|
error('module ' .. module_name .. ' does not support indexing syntax sugar')
|
|
end
|
|
end
|
|
local set_f = rawget(module, 'set')
|
|
if set_f ~= nil then
|
|
m_newindex = function (_, key, value)
|
|
-- This will produce a nasty error on some non-string parameters.
|
|
-- Still, we already use it with integer values, e.g. in predict module :-/
|
|
return set_f(key .. ' ' .. value)
|
|
end
|
|
else
|
|
m_newindex = function ()
|
|
error('module ' .. module_name .. ' does not support assignment syntax sugar')
|
|
end
|
|
end
|
|
setmetatable(module, {
|
|
-- note: the two functions only get called for *missing* indices
|
|
__index = m_index,
|
|
__newindex = m_newindex,
|
|
})
|
|
end
|
|
|
|
local layer_ctx = ffi.C.kr_layer_t_static
|
|
-- Utilities internal for lua layer glue; see ../ffimodule.c
|
|
modules_ffi_layer_wrap1 = function (layer_cb)
|
|
return layer_cb(layer_ctx.state, layer_ctx.req)
|
|
end
|
|
modules_ffi_layer_wrap2 = function (layer_cb)
|
|
return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt)
|
|
end
|
|
modules_ffi_layer_wrap_checkout = function (layer_cb)
|
|
return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt,
|
|
layer_ctx.dst, layer_ctx.is_stream)
|
|
end
|
|
modules_ffi_wrap_modcb = function (cb, kr_module_ud) -- this one isn't for layer
|
|
local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
|
|
return cb(kr_module)
|
|
end
|
|
|
|
-- Return filesystem size where the cache resides.
|
|
cache.fssize = function ()
|
|
local path = cache.current_storage or '.'
|
|
-- As it is now, `path` may or may not include the lmdb:// prefix.
|
|
if string.sub(path, 1, 7) == 'lmdb://' then
|
|
path = string.sub(path, 8)
|
|
end
|
|
if #path == 0 then
|
|
path = '.'
|
|
end
|
|
local size = tonumber(ffi.C.kr_fssize(path))
|
|
if size < 0 then
|
|
panic('cache.fssize(): %s', ffi.string(ffi.C.knot_strerror(size)))
|
|
else
|
|
return size
|
|
end
|
|
end
|
|
|
|
cache.clear = function (name, exact_name, rr_type, chunk_size, callback, prev_state)
|
|
if name == nil or (name == '.' and not exact_name) then
|
|
-- keep same output format as for 'standard' clear
|
|
local total_count = cache.count()
|
|
if not cache.clear_everything() then
|
|
error('unable to clear everything')
|
|
end
|
|
return {count = total_count}
|
|
end
|
|
-- Check parameters, in order, and set defaults if missing.
|
|
local dname = kres.str2dname(name)
|
|
if not dname then error('cache.clear(): incorrect name passed') end
|
|
if exact_name == nil then exact_name = false end
|
|
if type(exact_name) ~= 'boolean'
|
|
then error('cache.clear(): incorrect exact_name passed') end
|
|
|
|
local cach = kres.context().cache;
|
|
local rettable = {}
|
|
-- Apex warning. If the caller passes a custom callback,
|
|
-- we assume they are advanced enough not to need the check.
|
|
-- The point is to avoid repeating the check in each callback iteration.
|
|
if callback == nil then
|
|
local apex_array = ffi.new('knot_dname_t *[1]') -- C: dname **apex_array
|
|
local ret = ffi.C.kr_cache_closest_apex(cach, dname, false, apex_array)
|
|
if ret < 0 then
|
|
error(ffi.string(ffi.C.knot_strerror(ret))) end
|
|
if not ffi.C.knot_dname_is_equal(apex_array[0], dname) then
|
|
local apex_str = kres.dname2str(apex_array[0])
|
|
rettable.not_apex = 'to clear proofs of non-existence call '
|
|
.. 'cache.clear(\'' .. tostring(apex_str) ..'\')'
|
|
rettable.subtree = apex_str
|
|
end
|
|
ffi.C.free(apex_array[0])
|
|
end
|
|
|
|
if rr_type ~= nil then
|
|
-- Special case, without any subtree searching.
|
|
if not exact_name
|
|
then error('cache.clear(): specifying rr_type only supported with exact_name') end
|
|
if chunk_size or callback
|
|
then error('cache.clear(): chunk_size and callback parameters not supported with rr_type') end
|
|
local ret = ffi.C.kr_cache_remove(cach, dname, rr_type)
|
|
if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end
|
|
return {count = 1}
|
|
end
|
|
|
|
if chunk_size == nil then chunk_size = 100 end
|
|
if type(chunk_size) ~= 'number' or chunk_size <= 0
|
|
then error('cache.clear(): chunk_size has to be a positive integer') end
|
|
|
|
-- Do the C call, and add chunk_size warning.
|
|
rettable.count = ffi.C.kr_cache_remove_subtree(cach, dname, exact_name, chunk_size)
|
|
if rettable.count == chunk_size then
|
|
local msg_extra = ''
|
|
if callback == nil then
|
|
msg_extra = '; the default callback will continue asynchronously'
|
|
end
|
|
rettable.chunk_limit = 'chunk size limit reached' .. msg_extra
|
|
end
|
|
|
|
-- Default callback function: repeat after 1ms
|
|
if callback == nil then callback =
|
|
function (cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbprev_state, cbrettable)
|
|
if cbrettable.count < 0 then error(ffi.string(ffi.C.knot_strerror(cbrettable.count))) end
|
|
if cbprev_state == nil then cbprev_state = { round = 0 } end
|
|
if type(cbprev_state) ~= 'table'
|
|
then error('cache.clear() callback: incorrect prev_state passed') end
|
|
cbrettable.round = cbprev_state.round + 1
|
|
if (cbrettable.count == cbchunk_size) then
|
|
event.after(1, function ()
|
|
cache.clear(cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbrettable)
|
|
end)
|
|
elseif cbrettable.round > 1 then
|
|
log_info(ffi.C.LOG_GRP_CACHE, 'asynchronous cache.clear(\'' .. cbname .. '\', '
|
|
.. tostring(cbexact_name) .. ') finished')
|
|
end
|
|
return cbrettable
|
|
end
|
|
end
|
|
return callback(name, exact_name, rr_type, chunk_size, callback, prev_state, rettable)
|
|
end
|
|
-- Syntactic sugar for cache
|
|
-- `cache[x] -> cache.get(x)`
|
|
-- `cache.{size|storage} = value`
|
|
setmetatable(cache, {
|
|
__index = function (t, k)
|
|
local res = rawget(t, k)
|
|
if not res and not rawget(t, 'current_size') then return res end
|
|
-- Beware: t.get returns empty table on failure to find.
|
|
-- That would be confusing here (breaking kresc), so return nil instead.
|
|
res = t.get(k)
|
|
if res and next(res) ~= nil then return res else return nil end
|
|
end,
|
|
__newindex = function (t,k,v)
|
|
-- Defaults
|
|
local storage = rawget(t, 'current_storage')
|
|
if not storage then storage = 'lmdb://' end
|
|
local size = rawget(t, 'current_size')
|
|
if not size then size = 10*MB end
|
|
-- Declarative interface for cache
|
|
if k == 'size' then t.open(v, storage)
|
|
elseif k == 'storage' then t.open(size, v) end
|
|
end
|
|
})
|
|
|
|
-- Make sandboxed environment
|
|
local function make_sandbox(defined)
|
|
local __protected = {
|
|
worker = true, env = true, debugging = true, modules = true,
|
|
cache = true, net = true, trust_anchors = true
|
|
}
|
|
|
|
-- Compute and export the list of top-level names (hidden otherwise)
|
|
local nl = ""
|
|
for n in pairs(defined) do
|
|
nl = nl .. n .. "\n"
|
|
end
|
|
|
|
return setmetatable({ __orig_name_list = nl }, {
|
|
__index = defined,
|
|
__newindex = function (_, k, v)
|
|
if __protected[k] then
|
|
for k2,v2 in pairs(v) do
|
|
defined[k][k2] = v2
|
|
end
|
|
else
|
|
defined[k] = v
|
|
end
|
|
end
|
|
})
|
|
end
|
|
|
|
-- Compatibility sandbox
|
|
_G = make_sandbox(getfenv(0))
|
|
setfenv(0, _G)
|
|
|
|
-- Load default modules
|
|
trust_anchors = require('trust_anchors')
|
|
modules.load('ta_update')
|
|
modules.load('ta_signal_query')
|
|
modules.load('policy')
|
|
modules.load('priming')
|
|
modules.load('detect_time_skew')
|
|
modules.load('detect_time_jump')
|
|
modules.load('ta_sentinel')
|
|
modules.load('edns_keepalive')
|
|
modules.load('refuse_nord')
|
|
modules.load('watchdog')
|
|
modules.load('extended_error')
|
|
|
|
-- Load keyfile_default
|
|
trust_anchors.add_file('@keyfile_default@', @unmanaged@)
|
|
|
|
local function eval_cmd_compile(line, raw)
|
|
-- Compatibility sandbox code loading
|
|
local function load_code(code)
|
|
if getfenv then -- Lua 5.1
|
|
return loadstring(code)
|
|
else -- Lua 5.2+
|
|
return load(code, nil, 't', _ENV)
|
|
end
|
|
end
|
|
local err, chunk
|
|
chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')')
|
|
if err then
|
|
chunk, err = load_code(line)
|
|
end
|
|
return chunk, err
|
|
end
|
|
|
|
-- Interactive command evaluation
|
|
function eval_cmd(line, raw)
|
|
local chunk, err = eval_cmd_compile(line, raw)
|
|
if not err then
|
|
return chunk()
|
|
else
|
|
error(err)
|
|
end
|
|
end
|
|
|
|
-- Pretty printing
|
|
local pprint = require('krprint').pprint
|
|
function table_print(...)
|
|
local strs = {}
|
|
local nargs = select('#', ...)
|
|
if nargs == 0 then
|
|
return nil
|
|
end
|
|
for n=1,nargs do
|
|
local arg = select(n, ...)
|
|
local arg_str = pprint(arg)
|
|
if nargs > 1 then
|
|
table.insert(strs, string.format("%s\t-- result # %d", arg_str, n))
|
|
else
|
|
table.insert(strs, arg_str)
|
|
end
|
|
end
|
|
return table.concat(strs, '\n')
|
|
end
|
|
|
|
-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O.
|
|
-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not
|
|
-- block resolver engine while waiting for I/O or timers.
|
|
--
|
|
local has_cqueues, cqueues = pcall(require, 'cqueues')
|
|
if has_cqueues then
|
|
|
|
-- Export the asynchronous sleep function
|
|
worker.sleep = cqueues.sleep
|
|
|
|
-- Create metatable for workers to define the API
|
|
-- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer
|
|
local asynchronous_worker_mt = {
|
|
work = function (self)
|
|
local ok, err, _, co = self.cq:step(0)
|
|
if not ok then
|
|
log_warn(ffi.C.LOG_GRP_SYSTEM, '%s error: %s %s', self.name or 'worker', err, debug.traceback(co))
|
|
end
|
|
-- Reschedule timeout or create new one
|
|
local timeout = self.cq:timeout()
|
|
if timeout then
|
|
-- Throttle timeouts to avoid too frequent wakeups
|
|
if timeout == 0 then timeout = 0.00001 end
|
|
-- Convert from seconds to duration
|
|
timeout = timeout * sec
|
|
if not self.next_timeout then
|
|
self.next_timeout = event.after(timeout, self.on_step)
|
|
else
|
|
event.reschedule(self.next_timeout, timeout)
|
|
end
|
|
else -- Cancel running timeout when there is no next deadline
|
|
if self.next_timeout then
|
|
event.cancel(self.next_timeout)
|
|
self.next_timeout = nil
|
|
end
|
|
end
|
|
end,
|
|
wrap = function (self, f)
|
|
self.cq:wrap(f)
|
|
end,
|
|
loop = function (self)
|
|
self.on_step = function () self:work() end
|
|
self.event_fd = event.socket(self.cq:pollfd(), self.on_step)
|
|
end,
|
|
close = function (self)
|
|
if self.event_fd then
|
|
event.cancel(self.event_fd)
|
|
self.event_fd = nil
|
|
end
|
|
end,
|
|
}
|
|
|
|
-- Implement the coroutine worker with cqueues
|
|
local function worker_new (name)
|
|
return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt })
|
|
end
|
|
|
|
-- Create a default background worker
|
|
worker.bg_worker = worker_new('worker.background')
|
|
worker.bg_worker:loop()
|
|
|
|
-- Wrap a function for asynchronous execution
|
|
function worker.coroutine (f)
|
|
worker.bg_worker:wrap(f)
|
|
end
|
|
else
|
|
-- Disable asynchronous execution
|
|
local function disabled ()
|
|
error('Lua library cqueues is required for asynchronous execution (luaJIT requires library for Lua 5.1)')
|
|
end
|
|
worker.sleep = disabled
|
|
worker.map = disabled
|
|
worker.coroutine = disabled
|
|
worker.bg_worker = setmetatable({}, { __index = disabled })
|
|
end
|
|
|
|
-- Global commands for map()
|
|
|
|
-- must be public because it is called from eval_cmd()
|
|
-- when map() commands are read from control socket
|
|
function _map_luaobj_call_wrapper(cmd)
|
|
local func = eval_cmd_compile(cmd, true)
|
|
local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback))
|
|
local ok, serial = pcall(krprint.serialize_lua, ret, 'error')
|
|
if not ok then
|
|
log_error(ffi.C.LOG_GRP_SYSTEM, 'failed to serialize map() response %s (%s)',
|
|
table_print(ret), serial)
|
|
return krprint.serialize_lua(
|
|
kluautil.kr_table_pack(false, "returned values cannot be serialized: "
|
|
.. serial))
|
|
else
|
|
return serial
|
|
end
|
|
end
|
|
|
|
local function _sock_errmsg(path, desc)
|
|
return string.format(
|
|
'map() error while communicating with %s: %s',
|
|
path, desc)
|
|
end
|
|
|
|
local function _sock_check(sock, call, params, path, desc)
|
|
local errprefix = _sock_errmsg(path, desc) .. ': '
|
|
local retvals = kluautil.kr_table_pack(pcall(call, unpack(params)))
|
|
local ok = retvals[1]
|
|
if not ok then
|
|
error(errprefix .. tostring(retvals[2]))
|
|
end
|
|
local rerr, werr = sock:error()
|
|
if rerr or werr then
|
|
error(string.format('%sread error %s; write error %s', errprefix, rerr, werr))
|
|
end
|
|
if retvals[2] == nil then
|
|
error(errprefix .. 'unexpected nil result')
|
|
end
|
|
return unpack(retvals, 2, retvals.n)
|
|
end
|
|
|
|
local function _sock_assert(condition, path, desc)
|
|
if not condition then
|
|
error(_sock_errmsg(path, desc))
|
|
end
|
|
end
|
|
|
|
local function map_send_recv(cmd, path)
|
|
local bit = require('bit')
|
|
local socket = require('cqueues.socket')
|
|
local s = socket.connect({ path = path })
|
|
s:setmaxerrs(0)
|
|
s:setmode('bn', 'bn')
|
|
local status, err = pcall(s.connect, s)
|
|
if not status then
|
|
log_error(ffi.C.LOG_GRP_NETWORK, 'map() error while connecting to control socket %s: '
|
|
.. '%s (ignoring this socket)', path, err)
|
|
return nil
|
|
end
|
|
local ret = _sock_check(s, s.write, {s, '__binary\n'}, path,
|
|
'write __binary')
|
|
_sock_assert(ret, path,
|
|
'write __binary result')
|
|
local recv = _sock_check(s, s.read, {s, 2}, path,
|
|
'read reply to __binary')
|
|
_sock_assert(recv and recv == '> ', path,
|
|
'unexpected reply to __binary')
|
|
_sock_check(s, s.write, {s, cmd..'\n'}, path,
|
|
'command write')
|
|
recv = _sock_check(s, s.read, {s, 4}, path,
|
|
'response length read')
|
|
_sock_assert(recv and #recv == 4, path,
|
|
'length of response length preamble does not match')
|
|
local len = tonumber(recv:byte(1))
|
|
for i=2,4 do
|
|
len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
|
|
end
|
|
ret = _sock_check(s, s.read, {s, len}, path,
|
|
'read response')
|
|
_sock_assert(ret and #ret == len, path,
|
|
'actual response length does not match length in preamble')
|
|
s:close()
|
|
return ret
|
|
end
|
|
|
|
-- internal use only
|
|
-- Call cmd on each instance via control sockets.
|
|
-- @param format - "luaobj" if individual results should be Lua objects
|
|
-- - "strings" for eval_cmd output for each instance
|
|
-- @returns table with results, one item per instance + key n=number of instances
|
|
-- (order of return values is undefined)
|
|
-- @throws Lua error if:
|
|
-- - communication failed in the middle of transaction
|
|
-- - a result is not serializable
|
|
-- - individual call throws an error
|
|
-- - number of return values != 1 per instance per call
|
|
-- - cmd execution state is undefined after an error
|
|
-- Connection errors at the beginning are ignored to paper over leftover dead sockets.
|
|
function map(cmd, format)
|
|
local local_sockets = {}
|
|
local results = {}
|
|
|
|
if (type(cmd) ~= 'string') then
|
|
panic('map() command must be a string') end
|
|
if string.find(cmd, '\n', 1, true) then
|
|
panic('map() command cannot contain literal \\n, escape it with \\010') end
|
|
if (#cmd <= 0) then
|
|
panic('map() command must be non-empty') end
|
|
-- syntax check on input command to detect typos early
|
|
local chunk, err = eval_cmd_compile(cmd, false)
|
|
if not chunk then
|
|
panic('failure when compiling map() command: %s', err)
|
|
end
|
|
|
|
format = format or 'luaobj'
|
|
if (format ~= 'luaobj' and format ~= 'strings') then
|
|
panic('map() output format must be luaobj or strings') end
|
|
if format == 'luaobj' then
|
|
cmd = '_map_luaobj_call_wrapper([=====[' .. cmd .. ']=====])'
|
|
end
|
|
|
|
-- find out control socket paths
|
|
for _,v in pairs(net.list()) do
|
|
if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
|
|
table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
|
|
end
|
|
end
|
|
local filetab = kluautil.list_dir(worker.control_path)
|
|
if next(filetab) == nil then
|
|
panic('no control sockets found in directory %s',
|
|
worker.control_path)
|
|
end
|
|
|
|
local result_count = 0
|
|
-- finally execute it on all instances
|
|
for _, file in ipairs(filetab) do
|
|
local local_exec = false
|
|
for _, lsoc in ipairs(local_sockets) do
|
|
if file == lsoc then
|
|
local_exec = true
|
|
end
|
|
end
|
|
local path = worker.control_path..file
|
|
local path_name = (local_exec and 'this instance') or path
|
|
log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd)
|
|
local ret
|
|
if local_exec then
|
|
ret = eval_cmd(cmd)
|
|
else
|
|
ret = map_send_recv(cmd, path)
|
|
-- skip dead sockets (leftovers from dead instances)
|
|
if ret == nil then
|
|
goto continue
|
|
end
|
|
end
|
|
result_count = result_count + 1
|
|
-- return value is output from eval_cmd
|
|
-- i.e. string including "quotes" and Lua escaping in between
|
|
assert(type(ret) == 'string', 'map() protocol error, '
|
|
.. 'string not retured by follower')
|
|
assert(#ret >= 2 and
|
|
string.sub(ret, 1, 1) == "'"
|
|
and string.sub(ret, -1, -1) == "'",
|
|
'map() protocol error, value returned by follower does '
|
|
.. 'not look like a string')
|
|
-- deserialize string: remove "quotes" and de-escape bytes
|
|
ret = krprint.deserialize_lua(ret)
|
|
if format == 'luaobj' then
|
|
-- ret should be table with xpcall results serialized into string
|
|
ret = krprint.deserialize_lua(ret)
|
|
assert(type(ret) == 'table', 'map() protocol error, '
|
|
.. 'table with results not retured by follower')
|
|
if (ret.n ~= 2) then
|
|
log_error(ffi.C.LOG_GRP_SYSTEM, 'got unsupported map() response: %s', table_print(ret))
|
|
panic('unexpected number of return values in map() response: '
|
|
.. 'only single return value is allowed, '
|
|
.. 'use kluautil.kr_table_pack() helper')
|
|
end
|
|
local ok, retval = ret[1], ret[2]
|
|
if ok == false then
|
|
panic('error when executing map() command on control socket %s: '
|
|
.. '%s. command execution state is now undefined!',
|
|
path, retval)
|
|
end
|
|
-- drop wrapper table and return only the actual return value
|
|
ret = retval
|
|
end
|
|
results[result_count] = ret
|
|
::continue::
|
|
end
|
|
results.n = result_count
|
|
return results
|
|
end
|