diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 15:26:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 15:26:00 +0000 |
commit | 830407e88f9d40d954356c3754f2647f91d5c06a (patch) | |
tree | d6a0ece6feea91f3c656166dbaa884ef8a29740e /daemon/lua/sandbox.lua.in | |
parent | Initial commit. (diff) | |
download | knot-resolver-830407e88f9d40d954356c3754f2647f91d5c06a.tar.xz knot-resolver-830407e88f9d40d954356c3754f2647f91d5c06a.zip |
Adding upstream version 5.6.0.upstream/5.6.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | daemon/lua/sandbox.lua.in | 833 |
1 files changed, 833 insertions, 0 deletions
diff --git a/daemon/lua/sandbox.lua.in b/daemon/lua/sandbox.lua.in new file mode 100644 index 0000000..7c6a818 --- /dev/null +++ b/daemon/lua/sandbox.lua.in @@ -0,0 +1,833 @@ +-- SPDX-License-Identifier: GPL-3.0-or-later + +local debug = require('debug') +local ffi = require('ffi') +local kluautil = require('kluautil') +local krprint = require("krprint") + +-- Units +kB = 1024 +MB = 1024*kB +GB = 1024*MB +-- Time +sec = 1000 +second = sec +minute = 60 * sec +min = minute +hour = 60 * minute +day = 24 * hour + +-- Logging + +-- from syslog.h +LOG_CRIT = 2 +LOG_ERR = 3 +LOG_WARNING = 4 +LOG_NOTICE = 5 +LOG_INFO = 6 +LOG_DEBUG = 7 + +local function curr_file() return debug.getinfo(4,'S').source end +local function curr_line() return debug.getinfo(4,'l').currentline end + +local function log_fmt(grp, level, fmt, ...) + ffi.C.kr_log_fmt(grp, level, + 'CODE_FILE='..curr_file(), 'CODE_LINE='..curr_line(), 'CODE_FUNC=', + '[%-6s] %s\n', ffi.C.kr_log_grp2name(grp), string.format(fmt, ...)) +end + +function log_req(req, qry_uid, indent, grp, fmt, ...) + ffi.C.kr_log_req1(req, qry_uid, indent, grp, ffi.C.kr_log_grp2name(grp), + '%s\n', string.format(fmt, ...)) +end + +function log_qry(qry, grp, fmt, ...) + ffi.C.kr_log_q1(qry, grp, ffi.C.kr_log_grp2name(grp), + '%s\n', string.format(fmt, ...)) +end + +function panic(fmt, ...) + print(debug.traceback('error occurred here (config filename:lineno is ' + .. 'at the bottom, if config is involved):', 2)) + error(string.format('ERROR: '.. fmt, ...), 0) +end + +function log_error(grp, fmt, ...) + log_fmt(grp, LOG_ERR, fmt, ...) +end + +function log_warn(grp, fmt, ...) + log_fmt(grp, LOG_WARNING, fmt, ...) +end + +function log_notice(grp, fmt, ...) + log_fmt(grp, LOG_NOTICE, fmt, ...) +end + +function log_info(grp, fmt, ...) + log_fmt(grp, LOG_INFO, fmt, ...) +end + +function log_debug(grp, fmt, ...) + log_fmt(grp, LOG_DEBUG, fmt, ...) +end + +function log(fmt, ...) + log_notice(ffi.C.LOG_GRP_MODULE, fmt, ...) +end + +-- Resolver bindings +kres = require('kres') +if rawget(kres, 'str2dname') ~= nil then + todname = kres.str2dname +end + +worker.resolve_pkt = function (pkt, options, finish, init) + options = kres.mk_qflags(options) + local task = ffi.C.worker_resolve_start(pkt, options) + + -- Deal with finish and init callbacks + if finish ~= nil then + local finish_cb + finish_cb = ffi.cast('trace_callback_f', + function (req) + jit.off(true, true) -- JIT for (C -> lua)^2 nesting isn't allowed + finish(req.answer, req) + finish_cb:free() + end) + task.ctx.req.trace_finish = finish_cb + end + if init ~= nil then + init(task.ctx.req) + end + + return ffi.C.worker_resolve_exec(task, pkt) == 0 +end + +worker.resolve = function (qname, qtype, qclass, options, finish, init) + -- Alternatively use named arguments + if type(qname) == 'table' then + local t = qname + qname = t.name + qtype = t.type + qclass = t.class + options = t.options + finish = t.finish + init = t.init + end + qtype = qtype or kres.type.A + qclass = qclass or kres.class.IN + options = kres.mk_qflags(options) + -- LATER: nicer errors for rubbish in qname, qtype, qclass? + local pkt = ffi.C.worker_resolve_mk_pkt(qname, qtype, qclass, options) + if pkt == nil then + panic('failure in worker.resolve(); probably invalid qname "%s"', qname) + end + local ret = worker.resolve_pkt(pkt, options, finish, init) + ffi.C.knot_pkt_free(pkt); + return ret +end +resolve = worker.resolve + +-- Shorthand for aggregated per-worker information +worker.info = function () + local t = worker.stats() + t.pid = worker.pid + return t +end + +-- Resolver mode of operation +local current_mode = 'normal' +local mode_table = { normal=0, strict=1, permissive=2 } +function mode(m) + if not m then return current_mode end + if not mode_table[m] then error('unsupported mode: '..m) end + -- Update current operation mode + current_mode = m + option('STRICT', current_mode == 'strict') + option('PERMISSIVE', current_mode == 'permissive') + return true +end + +-- Trivial option alias +function reorder_RR(val) + return option('REORDER_RR', val) +end + +-- Get/set resolver options via name (string) +function option(name, val) + local flags = kres.context().options; + -- Note: no way to test existence of flags[name] but we want error anyway. + name = string.upper(name) -- convenience + if val ~= nil then + if (val ~= true) and (val ~= false) then + panic('invalid option value: ' .. tostring(val)) + end + flags[name] = val; + end + return flags[name]; +end + +-- Function aliases +-- `env.VAR returns os.getenv(VAR)` +env = {} +setmetatable(env, { + __index = function (_, k) return os.getenv(k) end +}) + +debugging = {} +setmetatable(debugging, { + __index = function(_, k) + if k == 'assertion_abort' then return ffi.C.kr_dbg_assertion_abort + elseif k == 'assertion_fork' then return ffi.C.kr_dbg_assertion_fork + else panic('invalid debugging option: ' .. tostring(k)) + end + end, + __newindex = function(_, k, v) + if k == 'assertion_abort' then ffi.C.kr_dbg_assertion_abort = v + elseif k == 'assertion_fork' then ffi.C.kr_dbg_assertion_fork = v + else panic('invalid debugging option: ' .. tostring(k)) + end + end +}) + +-- Quick access to interfaces +-- `net.<iface>` => `net.interfaces()[iface]` +-- `net = {addr1, ..}` => `net.listen(name, addr1)` +-- `net.ipv{4,6} = {true, false}` => enable/disable IPv{4,6} +setmetatable(net, { + __index = function (t, k) + local v = rawget(t, k) + if v then return v + elseif k == 'ipv6' then return not option('NO_IPV6') + elseif k == 'ipv4' then return not option('NO_IPV4') + else return net.interfaces()[k] + end + end, + __newindex = function (t,k,v) + if k == 'ipv6' then return option('NO_IPV6', not v) + elseif k == 'ipv4' then return option('NO_IPV4', not v) + else + local iname = rawget(net.interfaces(), v) + if iname then t.listen(iname) + else t.listen(v) + end + end + end +}) + +-- Syntactic sugar for module loading +-- `modules.<name> = <config>` +setmetatable(modules, { + __newindex = function (_, k, v) + if type(k) == 'number' then + k, v = v, nil + end + if not rawget(_G, k) then + modules.load(k) + k = string.match(k, '[%w_]+') + local mod = _G[k] + local config = mod and rawget(mod, 'config') + if mod ~= nil and config ~= nil then + if k ~= v then config(v) + else config() + end + end + end + end +}) + +-- Set up lua table for a C module. (Internal function.) +function modules_create_table_for_c(kr_module_ud) + local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0] + --- Set up the global table named according to the module. + if kr_module.config == nil and kr_module.props == nil then + return + end + local module = {} + local module_name = ffi.string(kr_module.name) + _G[module_name] = module + + --- Construct lua functions for properties. + if kr_module.props ~= nil then + local i = 0 + while true do + local prop = kr_module.props[i] + local cb = prop.cb + if cb == nil then break; end + module[ffi.string(prop.name)] = + function (arg) -- lua wrapper around kr_prop_cb function typedef + local arg_conv + if type(arg) == 'table' or type(arg) == 'boolean' then + arg_conv = tojson(arg) + elseif arg ~= nil then + arg_conv = tostring(arg) + end + local ret_cstr = cb(ffi.C.the_worker.engine, kr_module, arg_conv) + if ret_cstr == nil then + return nil + end + -- LATER(optim.): superfluous copying + local ret_str = ffi.string(ret_cstr) + -- This is a bit ugly, but the API is that invalid JSON + -- should be just returned as string :-( + local status, ret = pcall(fromjson, ret_str) + if not status then ret = ret_str end + ffi.C.free(ret_cstr) + return ret + end + i = i + 1 + end + end + + --- Construct lua function for config(). + if kr_module.config ~= nil then + module.config = + function (arg) + local arg_conv + if type(arg) == 'table' or type(arg) == 'boolean' then + arg_conv = tojson(arg) + elseif arg ~= nil then + arg_conv = tostring(arg) + end + return kr_module.config(kr_module, arg_conv) + end + end + + --- Add syntactic sugar for get() and set() properties. + --- That also "catches" any commands like `moduleName.foo = bar`. + local m_index, m_newindex + local get_f = rawget(module, 'get') + if get_f ~= nil then + m_index = function (_, key) + return get_f(key) + end + else + m_index = function () + error('module ' .. module_name .. ' does not support indexing syntax sugar') + end + end + local set_f = rawget(module, 'set') + if set_f ~= nil then + m_newindex = function (_, key, value) + -- This will produce a nasty error on some non-string parameters. + -- Still, we already use it with integer values, e.g. in predict module :-/ + return set_f(key .. ' ' .. value) + end + else + m_newindex = function () + error('module ' .. module_name .. ' does not support assignment syntax sugar') + end + end + setmetatable(module, { + -- note: the two functions only get called for *missing* indices + __index = m_index, + __newindex = m_newindex, + }) +end + +local layer_ctx = ffi.C.kr_layer_t_static +-- Utilities internal for lua layer glue; see ../ffimodule.c +modules_ffi_layer_wrap1 = function (layer_cb) + return layer_cb(layer_ctx.state, layer_ctx.req) +end +modules_ffi_layer_wrap2 = function (layer_cb) + return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt) +end +modules_ffi_layer_wrap_checkout = function (layer_cb) + return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt, + layer_ctx.dst, layer_ctx.is_stream) +end +modules_ffi_wrap_modcb = function (cb, kr_module_ud) -- this one isn't for layer + local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0] + return cb(kr_module) +end + +-- Return filesystem size where the cache resides. +cache.fssize = function () + local path = cache.current_storage or '.' + -- As it is now, `path` may or may not include the lmdb:// prefix. + if string.sub(path, 1, 7) == 'lmdb://' then + path = string.sub(path, 8) + end + if #path == 0 then + path = '.' + end + local size = tonumber(ffi.C.kr_fssize(path)) + if size < 0 then + panic('cache.fssize(): %s', ffi.string(ffi.C.knot_strerror(size))) + else + return size + end +end + +cache.clear = function (name, exact_name, rr_type, chunk_size, callback, prev_state) + if name == nil or (name == '.' and not exact_name) then + -- keep same output format as for 'standard' clear + local total_count = cache.count() + if not cache.clear_everything() then + error('unable to clear everything') + end + return {count = total_count} + end + -- Check parameters, in order, and set defaults if missing. + local dname = kres.str2dname(name) + if not dname then error('cache.clear(): incorrect name passed') end + if exact_name == nil then exact_name = false end + if type(exact_name) ~= 'boolean' + then error('cache.clear(): incorrect exact_name passed') end + + local cach = kres.context().cache; + local rettable = {} + -- Apex warning. If the caller passes a custom callback, + -- we assume they are advanced enough not to need the check. + -- The point is to avoid repeating the check in each callback iteration. + if callback == nil then + local apex_array = ffi.new('knot_dname_t *[1]') -- C: dname **apex_array + local ret = ffi.C.kr_cache_closest_apex(cach, dname, false, apex_array) + if ret < 0 then + error(ffi.string(ffi.C.knot_strerror(ret))) end + if not ffi.C.knot_dname_is_equal(apex_array[0], dname) then + local apex_str = kres.dname2str(apex_array[0]) + rettable.not_apex = 'to clear proofs of non-existence call ' + .. 'cache.clear(\'' .. tostring(apex_str) ..'\')' + rettable.subtree = apex_str + end + ffi.C.free(apex_array[0]) + end + + if rr_type ~= nil then + -- Special case, without any subtree searching. + if not exact_name + then error('cache.clear(): specifying rr_type only supported with exact_name') end + if chunk_size or callback + then error('cache.clear(): chunk_size and callback parameters not supported with rr_type') end + local ret = ffi.C.kr_cache_remove(cach, dname, rr_type) + if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end + return {count = 1} + end + + if chunk_size == nil then chunk_size = 100 end + if type(chunk_size) ~= 'number' or chunk_size <= 0 + then error('cache.clear(): chunk_size has to be a positive integer') end + + -- Do the C call, and add chunk_size warning. + rettable.count = ffi.C.kr_cache_remove_subtree(cach, dname, exact_name, chunk_size) + if rettable.count == chunk_size then + local msg_extra = '' + if callback == nil then + msg_extra = '; the default callback will continue asynchronously' + end + rettable.chunk_limit = 'chunk size limit reached' .. msg_extra + end + + -- Default callback function: repeat after 1ms + if callback == nil then callback = + function (cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbprev_state, cbrettable) + if cbrettable.count < 0 then error(ffi.string(ffi.C.knot_strerror(cbrettable.count))) end + if cbprev_state == nil then cbprev_state = { round = 0 } end + if type(cbprev_state) ~= 'table' + then error('cache.clear() callback: incorrect prev_state passed') end + cbrettable.round = cbprev_state.round + 1 + if (cbrettable.count == cbchunk_size) then + event.after(1, function () + cache.clear(cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbrettable) + end) + elseif cbrettable.round > 1 then + log_info(ffi.C.LOG_GRP_CACHE, 'asynchronous cache.clear(\'' .. cbname .. '\', ' + .. tostring(cbexact_name) .. ') finished') + end + return cbrettable + end + end + return callback(name, exact_name, rr_type, chunk_size, callback, prev_state, rettable) +end +-- Syntactic sugar for cache +-- `cache[x] -> cache.get(x)` +-- `cache.{size|storage} = value` +setmetatable(cache, { + __index = function (t, k) + local res = rawget(t, k) + if not res and not rawget(t, 'current_size') then return res end + -- Beware: t.get returns empty table on failure to find. + -- That would be confusing here (breaking kresc), so return nil instead. + res = t.get(k) + if res and next(res) ~= nil then return res else return nil end + end, + __newindex = function (t,k,v) + -- Defaults + local storage = rawget(t, 'current_storage') + if not storage then storage = 'lmdb://' end + local size = rawget(t, 'current_size') + if not size then size = 10*MB end + -- Declarative interface for cache + if k == 'size' then t.open(v, storage) + elseif k == 'storage' then t.open(size, v) end + end +}) + +-- Make sandboxed environment +local function make_sandbox(defined) + local __protected = { + worker = true, env = true, debugging = true, modules = true, + cache = true, net = true, trust_anchors = true + } + + -- Compute and export the list of top-level names (hidden otherwise) + local nl = "" + for n in pairs(defined) do + nl = nl .. n .. "\n" + end + + return setmetatable({ __orig_name_list = nl }, { + __index = defined, + __newindex = function (_, k, v) + if __protected[k] then + for k2,v2 in pairs(v) do + defined[k][k2] = v2 + end + else + defined[k] = v + end + end + }) +end + +-- Compatibility sandbox +_G = make_sandbox(getfenv(0)) +setfenv(0, _G) + +-- Load default modules +trust_anchors = require('trust_anchors') +modules.load('ta_update') +modules.load('ta_signal_query') +modules.load('policy') +modules.load('priming') +modules.load('detect_time_skew') +modules.load('detect_time_jump') +modules.load('ta_sentinel') +modules.load('edns_keepalive') +modules.load('refuse_nord') +modules.load('watchdog') +modules.load('extended_error') + +-- Load keyfile_default +trust_anchors.add_file('@keyfile_default@', @unmanaged@) + +local function eval_cmd_compile(line, raw) + -- Compatibility sandbox code loading + local function load_code(code) + if getfenv then -- Lua 5.1 + return loadstring(code) + else -- Lua 5.2+ + return load(code, nil, 't', _ENV) + end + end + local err, chunk + chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')') + if err then + chunk, err = load_code(line) + end + return chunk, err +end + +-- Interactive command evaluation +function eval_cmd(line, raw) + local chunk, err = eval_cmd_compile(line, raw) + if not err then + return chunk() + else + error(err) + end +end + +-- Pretty printing +local pprint = require('krprint').pprint +function table_print(...) + local strs = {} + local nargs = select('#', ...) + if nargs == 0 then + return nil + end + for n=1,nargs do + local arg = select(n, ...) + local arg_str = pprint(arg) + if nargs > 1 then + table.insert(strs, string.format("%s\t-- result # %d", arg_str, n)) + else + table.insert(strs, arg_str) + end + end + return table.concat(strs, '\n') +end + +-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O. +-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not +-- block resolver engine while waiting for I/O or timers. +-- +local has_cqueues, cqueues = pcall(require, 'cqueues') +if has_cqueues then + + -- Export the asynchronous sleep function + worker.sleep = cqueues.sleep + + -- Create metatable for workers to define the API + -- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer + local asynchronous_worker_mt = { + work = function (self) + local ok, err, _, co = self.cq:step(0) + if not ok then + log_warn(ffi.C.LOG_GRP_SYSTEM, '%s error: %s %s', self.name or 'worker', err, debug.traceback(co)) + end + -- Reschedule timeout or create new one + local timeout = self.cq:timeout() + if timeout then + -- Throttle timeouts to avoid too frequent wakeups + if timeout == 0 then timeout = 0.00001 end + -- Convert from seconds to duration + timeout = timeout * sec + if not self.next_timeout then + self.next_timeout = event.after(timeout, self.on_step) + else + event.reschedule(self.next_timeout, timeout) + end + else -- Cancel running timeout when there is no next deadline + if self.next_timeout then + event.cancel(self.next_timeout) + self.next_timeout = nil + end + end + end, + wrap = function (self, f) + self.cq:wrap(f) + end, + loop = function (self) + self.on_step = function () self:work() end + self.event_fd = event.socket(self.cq:pollfd(), self.on_step) + end, + close = function (self) + if self.event_fd then + event.cancel(self.event_fd) + self.event_fd = nil + end + end, + } + + -- Implement the coroutine worker with cqueues + local function worker_new (name) + return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt }) + end + + -- Create a default background worker + worker.bg_worker = worker_new('worker.background') + worker.bg_worker:loop() + + -- Wrap a function for asynchronous execution + function worker.coroutine (f) + worker.bg_worker:wrap(f) + end +else + -- Disable asynchronous execution + local function disabled () + error('Lua library cqueues is required for asynchronous execution (luaJIT requires library for Lua 5.1)') + end + worker.sleep = disabled + worker.map = disabled + worker.coroutine = disabled + worker.bg_worker = setmetatable({}, { __index = disabled }) +end + +-- Global commands for map() + +-- must be public because it is called from eval_cmd() +-- when map() commands are read from control socket +function _map_luaobj_call_wrapper(cmd) + local func = eval_cmd_compile(cmd, true) + local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback)) + local ok, serial = pcall(krprint.serialize_lua, ret, 'error') + if not ok then + log_error(ffi.C.LOG_GRP_SYSTEM, 'failed to serialize map() response %s (%s)', + table_print(ret), serial) + return krprint.serialize_lua( + kluautil.kr_table_pack(false, "returned values cannot be serialized: " + .. serial)) + else + return serial + end +end + +local function _sock_errmsg(path, desc) + return string.format( + 'map() error while communicating with %s: %s', + path, desc) +end + +local function _sock_check(sock, call, params, path, desc) + local errprefix = _sock_errmsg(path, desc) .. ': ' + local retvals = kluautil.kr_table_pack(pcall(call, unpack(params))) + local ok = retvals[1] + if not ok then + error(errprefix .. tostring(retvals[2])) + end + local rerr, werr = sock:error() + if rerr or werr then + error(string.format('%sread error %s; write error %s', errprefix, rerr, werr)) + end + if retvals[2] == nil then + error(errprefix .. 'unexpected nil result') + end + return unpack(retvals, 2, retvals.n) +end + +local function _sock_assert(condition, path, desc) + if not condition then + error(_sock_errmsg(path, desc)) + end +end + +local function map_send_recv(cmd, path) + local bit = require('bit') + local socket = require('cqueues.socket') + local s = socket.connect({ path = path }) + s:setmaxerrs(0) + s:setmode('bn', 'bn') + local status, err = pcall(s.connect, s) + if not status then + log_error(ffi.C.LOG_GRP_NETWORK, 'map() error while connecting to control socket %s: ' + .. '%s (ignoring this socket)', path, err) + return nil + end + local ret = _sock_check(s, s.write, {s, '__binary\n'}, path, + 'write __binary') + _sock_assert(ret, path, + 'write __binary result') + local recv = _sock_check(s, s.read, {s, 2}, path, + 'read reply to __binary') + _sock_assert(recv and recv == '> ', path, + 'unexpected reply to __binary') + _sock_check(s, s.write, {s, cmd..'\n'}, path, + 'command write') + recv = _sock_check(s, s.read, {s, 4}, path, + 'response length read') + _sock_assert(recv and #recv == 4, path, + 'length of response length preamble does not match') + local len = tonumber(recv:byte(1)) + for i=2,4 do + len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i))) + end + ret = _sock_check(s, s.read, {s, len}, path, + 'read response') + _sock_assert(ret and #ret == len, path, + 'actual response length does not match length in preamble') + s:close() + return ret +end + +-- internal use only +-- Call cmd on each instance via control sockets. +-- @param format - "luaobj" if individual results should be Lua objects +-- - "strings" for eval_cmd output for each instance +-- @returns table with results, one item per instance + key n=number of instances +-- (order of return values is undefined) +-- @throws Lua error if: +-- - communication failed in the middle of transaction +-- - a result is not serializable +-- - individual call throws an error +-- - number of return values != 1 per instance per call +-- - cmd execution state is undefined after an error +-- Connection errors at the beginning are ignored to paper over leftover dead sockets. +function map(cmd, format) + local local_sockets = {} + local results = {} + + if (type(cmd) ~= 'string') then + panic('map() command must be a string') end + if string.find(cmd, '\n', 1, true) then + panic('map() command cannot contain literal \\n, escape it with \\010') end + if (#cmd <= 0) then + panic('map() command must be non-empty') end + -- syntax check on input command to detect typos early + local chunk, err = eval_cmd_compile(cmd, false) + if not chunk then + panic('failure when compiling map() command: %s', err) + end + + format = format or 'luaobj' + if (format ~= 'luaobj' and format ~= 'strings') then + panic('map() output format must be luaobj or strings') end + if format == 'luaobj' then + cmd = '_map_luaobj_call_wrapper([=====[' .. cmd .. ']=====])' + end + + -- find out control socket paths + for _,v in pairs(net.list()) do + if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then + table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$')) + end + end + local filetab = kluautil.list_dir(worker.control_path) + if next(filetab) == nil then + panic('no control sockets found in directory %s', + worker.control_path) + end + + local result_count = 0 + -- finally execute it on all instances + for _, file in ipairs(filetab) do + local local_exec = false + for _, lsoc in ipairs(local_sockets) do + if file == lsoc then + local_exec = true + end + end + local path = worker.control_path..file + local path_name = (local_exec and 'this instance') or path + log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd) + local ret + if local_exec then + ret = eval_cmd(cmd) + else + ret = map_send_recv(cmd, path) + -- skip dead sockets (leftovers from dead instances) + if ret == nil then + goto continue + end + end + result_count = result_count + 1 + -- return value is output from eval_cmd + -- i.e. string including "quotes" and Lua escaping in between + assert(type(ret) == 'string', 'map() protocol error, ' + .. 'string not retured by follower') + assert(#ret >= 2 and + string.sub(ret, 1, 1) == "'" + and string.sub(ret, -1, -1) == "'", + 'map() protocol error, value returned by follower does ' + .. 'not look like a string') + -- deserialize string: remove "quotes" and de-escape bytes + ret = krprint.deserialize_lua(ret) + if format == 'luaobj' then + -- ret should be table with xpcall results serialized into string + ret = krprint.deserialize_lua(ret) + assert(type(ret) == 'table', 'map() protocol error, ' + .. 'table with results not retured by follower') + if (ret.n ~= 2) then + log_error(ffi.C.LOG_GRP_SYSTEM, 'got unsupported map() response: %s', table_print(ret)) + panic('unexpected number of return values in map() response: ' + .. 'only single return value is allowed, ' + .. 'use kluautil.kr_table_pack() helper') + end + local ok, retval = ret[1], ret[2] + if ok == false then + panic('error when executing map() command on control socket %s: ' + .. '%s. command execution state is now undefined!', + path, retval) + end + -- drop wrapper table and return only the actual return value + ret = retval + end + results[result_count] = ret + ::continue:: + end + results.n = result_count + return results +end |