-- SPDX-License-Identifier: GPL-3.0-or-later local debug = require('debug') local ffi = require('ffi') local kluautil = require('kluautil') local krprint = require("krprint") -- Units kB = 1024 MB = 1024*kB GB = 1024*MB -- Time sec = 1000 second = sec minute = 60 * sec min = minute hour = 60 * minute day = 24 * hour -- Logging -- from syslog.h LOG_CRIT = 2 LOG_ERR = 3 LOG_WARNING = 4 LOG_NOTICE = 5 LOG_INFO = 6 LOG_DEBUG = 7 local function curr_file() return debug.getinfo(4,'S').source end local function curr_line() return debug.getinfo(4,'l').currentline end local function log_fmt(grp, level, fmt, ...) ffi.C.kr_log_fmt(grp, level, 'CODE_FILE='..curr_file(), 'CODE_LINE='..curr_line(), 'CODE_FUNC=', '[%-6s] %s\n', ffi.C.kr_log_grp2name(grp), string.format(fmt, ...)) end function log_req(req, qry_uid, indent, grp, fmt, ...) ffi.C.kr_log_req1(req, qry_uid, indent, grp, ffi.C.kr_log_grp2name(grp), '%s\n', string.format(fmt, ...)) end function log_qry(qry, grp, fmt, ...) ffi.C.kr_log_q1(qry, grp, ffi.C.kr_log_grp2name(grp), '%s\n', string.format(fmt, ...)) end function panic(fmt, ...) print(debug.traceback('error occurred here (config filename:lineno is ' .. 'at the bottom, if config is involved):', 2)) error(string.format('ERROR: '.. fmt, ...), 0) end function log_error(grp, fmt, ...) log_fmt(grp, LOG_ERR, fmt, ...) end function log_warn(grp, fmt, ...) log_fmt(grp, LOG_WARNING, fmt, ...) end function log_notice(grp, fmt, ...) log_fmt(grp, LOG_NOTICE, fmt, ...) end function log_info(grp, fmt, ...) log_fmt(grp, LOG_INFO, fmt, ...) end function log_debug(grp, fmt, ...) log_fmt(grp, LOG_DEBUG, fmt, ...) end function log(fmt, ...) log_notice(ffi.C.LOG_GRP_MODULE, fmt, ...) end -- Resolver bindings kres = require('kres') if rawget(kres, 'str2dname') ~= nil then todname = kres.str2dname end worker.resolve_pkt = function (pkt, options, finish, init) options = kres.mk_qflags(options) local task = ffi.C.worker_resolve_start(pkt, options) -- Deal with finish and init callbacks if finish ~= nil then local finish_cb finish_cb = ffi.cast('trace_callback_f', function (req) jit.off(true, true) -- JIT for (C -> lua)^2 nesting isn't allowed finish(req.answer, req) finish_cb:free() end) task.ctx.req.trace_finish = finish_cb end if init ~= nil then init(task.ctx.req) end return ffi.C.worker_resolve_exec(task, pkt) == 0 end worker.resolve = function (qname, qtype, qclass, options, finish, init) -- Alternatively use named arguments if type(qname) == 'table' then local t = qname qname = t.name qtype = t.type qclass = t.class options = t.options finish = t.finish init = t.init end qtype = qtype or kres.type.A qclass = qclass or kres.class.IN options = kres.mk_qflags(options) -- LATER: nicer errors for rubbish in qname, qtype, qclass? local pkt = ffi.C.worker_resolve_mk_pkt(qname, qtype, qclass, options) if pkt == nil then panic('failure in worker.resolve(); probably invalid qname "%s"', qname) end local ret = worker.resolve_pkt(pkt, options, finish, init) ffi.C.knot_pkt_free(pkt); return ret end resolve = worker.resolve -- Shorthand for aggregated per-worker information worker.info = function () local t = worker.stats() t.pid = worker.pid return t end -- Resolver mode of operation local current_mode = 'normal' local mode_table = { normal=0, strict=1, permissive=2 } function mode(m) if not m then return current_mode end if not mode_table[m] then error('unsupported mode: '..m) end -- Update current operation mode current_mode = m option('STRICT', current_mode == 'strict') option('PERMISSIVE', current_mode == 'permissive') return true end -- Trivial option alias function reorder_RR(val) return option('REORDER_RR', val) end -- Get/set resolver options via name (string) function option(name, val) local flags = kres.context().options; -- Note: no way to test existence of flags[name] but we want error anyway. name = string.upper(name) -- convenience if val ~= nil then if (val ~= true) and (val ~= false) then panic('invalid option value: ' .. tostring(val)) end flags[name] = val; end return flags[name]; end -- Function aliases -- `env.VAR returns os.getenv(VAR)` env = {} setmetatable(env, { __index = function (_, k) return os.getenv(k) end }) debugging = {} setmetatable(debugging, { __index = function(_, k) if k == 'assertion_abort' then return ffi.C.kr_dbg_assertion_abort elseif k == 'assertion_fork' then return ffi.C.kr_dbg_assertion_fork else panic('invalid debugging option: ' .. tostring(k)) end end, __newindex = function(_, k, v) if k == 'assertion_abort' then ffi.C.kr_dbg_assertion_abort = v elseif k == 'assertion_fork' then ffi.C.kr_dbg_assertion_fork = v else panic('invalid debugging option: ' .. tostring(k)) end end }) -- Quick access to interfaces -- `net.` => `net.interfaces()[iface]` -- `net = {addr1, ..}` => `net.listen(name, addr1)` -- `net.ipv{4,6} = {true, false}` => enable/disable IPv{4,6} setmetatable(net, { __index = function (t, k) local v = rawget(t, k) if v then return v elseif k == 'ipv6' then return not option('NO_IPV6') elseif k == 'ipv4' then return not option('NO_IPV4') else return net.interfaces()[k] end end, __newindex = function (t,k,v) if k == 'ipv6' then return option('NO_IPV6', not v) elseif k == 'ipv4' then return option('NO_IPV4', not v) else local iname = rawget(net.interfaces(), v) if iname then t.listen(iname) else t.listen(v) end end end }) -- Syntactic sugar for module loading -- `modules. = ` setmetatable(modules, { __newindex = function (_, k, v) if type(k) == 'number' then k, v = v, nil end if not rawget(_G, k) then modules.load(k) k = string.match(k, '[%w_]+') local mod = _G[k] local config = mod and rawget(mod, 'config') if mod ~= nil and config ~= nil then if k ~= v then config(v) else config() end end end end }) -- Set up lua table for a C module. (Internal function.) function modules_create_table_for_c(kr_module_ud) local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0] --- Set up the global table named according to the module. if kr_module.config == nil and kr_module.props == nil then return end local module = {} local module_name = ffi.string(kr_module.name) _G[module_name] = module --- Construct lua functions for properties. if kr_module.props ~= nil then local i = 0 while true do local prop = kr_module.props[i] local cb = prop.cb if cb == nil then break; end module[ffi.string(prop.name)] = function (arg) -- lua wrapper around kr_prop_cb function typedef local arg_conv if type(arg) == 'table' or type(arg) == 'boolean' then arg_conv = tojson(arg) elseif arg ~= nil then arg_conv = tostring(arg) end local ret_cstr = cb(ffi.C.the_worker.engine, kr_module, arg_conv) if ret_cstr == nil then return nil end -- LATER(optim.): superfluous copying local ret_str = ffi.string(ret_cstr) -- This is a bit ugly, but the API is that invalid JSON -- should be just returned as string :-( local status, ret = pcall(fromjson, ret_str) if not status then ret = ret_str end ffi.C.free(ret_cstr) return ret end i = i + 1 end end --- Construct lua function for config(). if kr_module.config ~= nil then module.config = function (arg) local arg_conv if type(arg) == 'table' or type(arg) == 'boolean' then arg_conv = tojson(arg) elseif arg ~= nil then arg_conv = tostring(arg) end return kr_module.config(kr_module, arg_conv) end end --- Add syntactic sugar for get() and set() properties. --- That also "catches" any commands like `moduleName.foo = bar`. local m_index, m_newindex local get_f = rawget(module, 'get') if get_f ~= nil then m_index = function (_, key) return get_f(key) end else m_index = function () error('module ' .. module_name .. ' does not support indexing syntax sugar') end end local set_f = rawget(module, 'set') if set_f ~= nil then m_newindex = function (_, key, value) -- This will produce a nasty error on some non-string parameters. -- Still, we already use it with integer values, e.g. in predict module :-/ return set_f(key .. ' ' .. value) end else m_newindex = function () error('module ' .. module_name .. ' does not support assignment syntax sugar') end end setmetatable(module, { -- note: the two functions only get called for *missing* indices __index = m_index, __newindex = m_newindex, }) end local layer_ctx = ffi.C.kr_layer_t_static -- Utilities internal for lua layer glue; see ../ffimodule.c modules_ffi_layer_wrap1 = function (layer_cb) return layer_cb(layer_ctx.state, layer_ctx.req) end modules_ffi_layer_wrap2 = function (layer_cb) return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt) end modules_ffi_layer_wrap_checkout = function (layer_cb) return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt, layer_ctx.dst, layer_ctx.is_stream) end modules_ffi_wrap_modcb = function (cb, kr_module_ud) -- this one isn't for layer local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0] return cb(kr_module) end -- Return filesystem size where the cache resides. cache.fssize = function () local path = cache.current_storage or '.' -- As it is now, `path` may or may not include the lmdb:// prefix. if string.sub(path, 1, 7) == 'lmdb://' then path = string.sub(path, 8) end if #path == 0 then path = '.' end local size = tonumber(ffi.C.kr_fssize(path)) if size < 0 then panic('cache.fssize(): %s', ffi.string(ffi.C.knot_strerror(size))) else return size end end cache.clear = function (name, exact_name, rr_type, chunk_size, callback, prev_state) if name == nil or (name == '.' and not exact_name) then -- keep same output format as for 'standard' clear local total_count = cache.count() if not cache.clear_everything() then error('unable to clear everything') end return {count = total_count} end -- Check parameters, in order, and set defaults if missing. local dname = kres.str2dname(name) if not dname then error('cache.clear(): incorrect name passed') end if exact_name == nil then exact_name = false end if type(exact_name) ~= 'boolean' then error('cache.clear(): incorrect exact_name passed') end local cach = kres.context().cache; local rettable = {} -- Apex warning. If the caller passes a custom callback, -- we assume they are advanced enough not to need the check. -- The point is to avoid repeating the check in each callback iteration. if callback == nil then local apex_array = ffi.new('knot_dname_t *[1]') -- C: dname **apex_array local ret = ffi.C.kr_cache_closest_apex(cach, dname, false, apex_array) if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end if not ffi.C.knot_dname_is_equal(apex_array[0], dname) then local apex_str = kres.dname2str(apex_array[0]) rettable.not_apex = 'to clear proofs of non-existence call ' .. 'cache.clear(\'' .. tostring(apex_str) ..'\')' rettable.subtree = apex_str end ffi.C.free(apex_array[0]) end if rr_type ~= nil then -- Special case, without any subtree searching. if not exact_name then error('cache.clear(): specifying rr_type only supported with exact_name') end if chunk_size or callback then error('cache.clear(): chunk_size and callback parameters not supported with rr_type') end local ret = ffi.C.kr_cache_remove(cach, dname, rr_type) if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end return {count = 1} end if chunk_size == nil then chunk_size = 100 end if type(chunk_size) ~= 'number' or chunk_size <= 0 then error('cache.clear(): chunk_size has to be a positive integer') end -- Do the C call, and add chunk_size warning. rettable.count = ffi.C.kr_cache_remove_subtree(cach, dname, exact_name, chunk_size) if rettable.count == chunk_size then local msg_extra = '' if callback == nil then msg_extra = '; the default callback will continue asynchronously' end rettable.chunk_limit = 'chunk size limit reached' .. msg_extra end -- Default callback function: repeat after 1ms if callback == nil then callback = function (cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbprev_state, cbrettable) if cbrettable.count < 0 then error(ffi.string(ffi.C.knot_strerror(cbrettable.count))) end if cbprev_state == nil then cbprev_state = { round = 0 } end if type(cbprev_state) ~= 'table' then error('cache.clear() callback: incorrect prev_state passed') end cbrettable.round = cbprev_state.round + 1 if (cbrettable.count == cbchunk_size) then event.after(1, function () cache.clear(cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbrettable) end) elseif cbrettable.round > 1 then log_info(ffi.C.LOG_GRP_CACHE, 'asynchronous cache.clear(\'' .. cbname .. '\', ' .. tostring(cbexact_name) .. ') finished') end return cbrettable end end return callback(name, exact_name, rr_type, chunk_size, callback, prev_state, rettable) end -- Syntactic sugar for cache -- `cache[x] -> cache.get(x)` -- `cache.{size|storage} = value` setmetatable(cache, { __index = function (t, k) local res = rawget(t, k) if not res and not rawget(t, 'current_size') then return res end -- Beware: t.get returns empty table on failure to find. -- That would be confusing here (breaking kresc), so return nil instead. res = t.get(k) if res and next(res) ~= nil then return res else return nil end end, __newindex = function (t,k,v) -- Defaults local storage = rawget(t, 'current_storage') if not storage then storage = 'lmdb://' end local size = rawget(t, 'current_size') if not size then size = 10*MB end -- Declarative interface for cache if k == 'size' then t.open(v, storage) elseif k == 'storage' then t.open(size, v) end end }) -- Make sandboxed environment local function make_sandbox(defined) local __protected = { worker = true, env = true, debugging = true, modules = true, cache = true, net = true, trust_anchors = true } -- Compute and export the list of top-level names (hidden otherwise) local nl = "" for n in pairs(defined) do nl = nl .. n .. "\n" end return setmetatable({ __orig_name_list = nl }, { __index = defined, __newindex = function (_, k, v) if __protected[k] then for k2,v2 in pairs(v) do defined[k][k2] = v2 end else defined[k] = v end end }) end -- Compatibility sandbox _G = make_sandbox(getfenv(0)) setfenv(0, _G) -- Load default modules trust_anchors = require('trust_anchors') modules.load('ta_update') modules.load('ta_signal_query') modules.load('policy') modules.load('priming') modules.load('detect_time_skew') modules.load('detect_time_jump') modules.load('ta_sentinel') modules.load('edns_keepalive') modules.load('refuse_nord') modules.load('watchdog') modules.load('extended_error') -- Load keyfile_default trust_anchors.add_file('@keyfile_default@', @unmanaged@) local function eval_cmd_compile(line, raw) -- Compatibility sandbox code loading local function load_code(code) if getfenv then -- Lua 5.1 return loadstring(code) else -- Lua 5.2+ return load(code, nil, 't', _ENV) end end local err, chunk chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')') if err then chunk, err = load_code(line) end return chunk, err end -- Interactive command evaluation function eval_cmd(line, raw) local chunk, err = eval_cmd_compile(line, raw) if not err then return chunk() else error(err) end end -- Pretty printing local pprint = require('krprint').pprint function table_print(...) local strs = {} local nargs = select('#', ...) if nargs == 0 then return nil end for n=1,nargs do local arg = select(n, ...) local arg_str = pprint(arg) if nargs > 1 then table.insert(strs, string.format("%s\t-- result # %d", arg_str, n)) else table.insert(strs, arg_str) end end return table.concat(strs, '\n') end -- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O. -- The current implementation combines cqueues for Lua interface, and event.socket() in order to not -- block resolver engine while waiting for I/O or timers. -- local has_cqueues, cqueues = pcall(require, 'cqueues') if has_cqueues then -- Export the asynchronous sleep function worker.sleep = cqueues.sleep -- Create metatable for workers to define the API -- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer local asynchronous_worker_mt = { work = function (self) local ok, err, _, co = self.cq:step(0) if not ok then log_warn(ffi.C.LOG_GRP_SYSTEM, '%s error: %s %s', self.name or 'worker', err, debug.traceback(co)) end -- Reschedule timeout or create new one local timeout = self.cq:timeout() if timeout then -- Throttle timeouts to avoid too frequent wakeups if timeout == 0 then timeout = 0.00001 end -- Convert from seconds to duration timeout = timeout * sec if not self.next_timeout then self.next_timeout = event.after(timeout, self.on_step) else event.reschedule(self.next_timeout, timeout) end else -- Cancel running timeout when there is no next deadline if self.next_timeout then event.cancel(self.next_timeout) self.next_timeout = nil end end end, wrap = function (self, f) self.cq:wrap(f) end, loop = function (self) self.on_step = function () self:work() end self.event_fd = event.socket(self.cq:pollfd(), self.on_step) end, close = function (self) if self.event_fd then event.cancel(self.event_fd) self.event_fd = nil end end, } -- Implement the coroutine worker with cqueues local function worker_new (name) return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt }) end -- Create a default background worker worker.bg_worker = worker_new('worker.background') worker.bg_worker:loop() -- Wrap a function for asynchronous execution function worker.coroutine (f) worker.bg_worker:wrap(f) end else -- Disable asynchronous execution local function disabled () error('Lua library cqueues is required for asynchronous execution (luaJIT requires library for Lua 5.1)') end worker.sleep = disabled worker.map = disabled worker.coroutine = disabled worker.bg_worker = setmetatable({}, { __index = disabled }) end -- Global commands for map() -- must be public because it is called from eval_cmd() -- when map() commands are read from control socket function _map_luaobj_call_wrapper(cmd) local func = eval_cmd_compile(cmd, true) local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback)) local ok, serial = pcall(krprint.serialize_lua, ret, 'error') if not ok then log_error(ffi.C.LOG_GRP_SYSTEM, 'failed to serialize map() response %s (%s)', table_print(ret), serial) return krprint.serialize_lua( kluautil.kr_table_pack(false, "returned values cannot be serialized: " .. serial)) else return serial end end local function _sock_errmsg(path, desc) return string.format( 'map() error while communicating with %s: %s', path, desc) end local function _sock_check(sock, call, params, path, desc) local errprefix = _sock_errmsg(path, desc) .. ': ' local retvals = kluautil.kr_table_pack(pcall(call, unpack(params))) local ok = retvals[1] if not ok then error(errprefix .. tostring(retvals[2])) end local rerr, werr = sock:error() if rerr or werr then error(string.format('%sread error %s; write error %s', errprefix, rerr, werr)) end if retvals[2] == nil then error(errprefix .. 'unexpected nil result') end return unpack(retvals, 2, retvals.n) end local function _sock_assert(condition, path, desc) if not condition then error(_sock_errmsg(path, desc)) end end local function map_send_recv(cmd, path) local bit = require('bit') local socket = require('cqueues.socket') local s = socket.connect({ path = path }) s:setmaxerrs(0) s:setmode('bn', 'bn') local status, err = pcall(s.connect, s) if not status then log_error(ffi.C.LOG_GRP_NETWORK, 'map() error while connecting to control socket %s: ' .. '%s (ignoring this socket)', path, err) return nil end local ret = _sock_check(s, s.write, {s, '__binary\n'}, path, 'write __binary') _sock_assert(ret, path, 'write __binary result') local recv = _sock_check(s, s.read, {s, 2}, path, 'read reply to __binary') _sock_assert(recv and recv == '> ', path, 'unexpected reply to __binary') _sock_check(s, s.write, {s, cmd..'\n'}, path, 'command write') recv = _sock_check(s, s.read, {s, 4}, path, 'response length read') _sock_assert(recv and #recv == 4, path, 'length of response length preamble does not match') local len = tonumber(recv:byte(1)) for i=2,4 do len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i))) end ret = _sock_check(s, s.read, {s, len}, path, 'read response') _sock_assert(ret and #ret == len, path, 'actual response length does not match length in preamble') s:close() return ret end -- internal use only -- Call cmd on each instance via control sockets. -- @param format - "luaobj" if individual results should be Lua objects -- - "strings" for eval_cmd output for each instance -- @returns table with results, one item per instance + key n=number of instances -- (order of return values is undefined) -- @throws Lua error if: -- - communication failed in the middle of transaction -- - a result is not serializable -- - individual call throws an error -- - number of return values != 1 per instance per call -- - cmd execution state is undefined after an error -- Connection errors at the beginning are ignored to paper over leftover dead sockets. function map(cmd, format) local local_sockets = {} local results = {} if (type(cmd) ~= 'string') then panic('map() command must be a string') end if string.find(cmd, '\n', 1, true) then panic('map() command cannot contain literal \\n, escape it with \\010') end if (#cmd <= 0) then panic('map() command must be non-empty') end -- syntax check on input command to detect typos early local chunk, err = eval_cmd_compile(cmd, false) if not chunk then panic('failure when compiling map() command: %s', err) end format = format or 'luaobj' if (format ~= 'luaobj' and format ~= 'strings') then panic('map() output format must be luaobj or strings') end if format == 'luaobj' then cmd = '_map_luaobj_call_wrapper([=====[' .. cmd .. ']=====])' end -- find out control socket paths for _,v in pairs(net.list()) do if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$')) end end local filetab = kluautil.list_dir(worker.control_path) if next(filetab) == nil then panic('no control sockets found in directory %s', worker.control_path) end local result_count = 0 -- finally execute it on all instances for _, file in ipairs(filetab) do local local_exec = false for _, lsoc in ipairs(local_sockets) do if file == lsoc then local_exec = true end end local path = worker.control_path..file local path_name = (local_exec and 'this instance') or path log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd) local ret if local_exec then ret = eval_cmd(cmd) else ret = map_send_recv(cmd, path) -- skip dead sockets (leftovers from dead instances) if ret == nil then goto continue end end result_count = result_count + 1 -- return value is output from eval_cmd -- i.e. string including "quotes" and Lua escaping in between assert(type(ret) == 'string', 'map() protocol error, ' .. 'string not retured by follower') assert(#ret >= 2 and string.sub(ret, 1, 1) == "'" and string.sub(ret, -1, -1) == "'", 'map() protocol error, value returned by follower does ' .. 'not look like a string') -- deserialize string: remove "quotes" and de-escape bytes ret = krprint.deserialize_lua(ret) if format == 'luaobj' then -- ret should be table with xpcall results serialized into string ret = krprint.deserialize_lua(ret) assert(type(ret) == 'table', 'map() protocol error, ' .. 'table with results not retured by follower') if (ret.n ~= 2) then log_error(ffi.C.LOG_GRP_SYSTEM, 'got unsupported map() response: %s', table_print(ret)) panic('unexpected number of return values in map() response: ' .. 'only single return value is allowed, ' .. 'use kluautil.kr_table_pack() helper') end local ok, retval = ret[1], ret[2] if ok == false then panic('error when executing map() command on control socket %s: ' .. '%s. command execution state is now undefined!', path, retval) end -- drop wrapper table and return only the actual return value ret = retval end results[result_count] = ret ::continue:: end results.n = result_count return results end