summaryrefslogtreecommitdiffstats
path: root/daemon/lua/sandbox.lua.in
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--daemon/lua/sandbox.lua.in833
1 files changed, 833 insertions, 0 deletions
diff --git a/daemon/lua/sandbox.lua.in b/daemon/lua/sandbox.lua.in
new file mode 100644
index 0000000..7c6a818
--- /dev/null
+++ b/daemon/lua/sandbox.lua.in
@@ -0,0 +1,833 @@
+-- SPDX-License-Identifier: GPL-3.0-or-later
+
+local debug = require('debug')
+local ffi = require('ffi')
+local kluautil = require('kluautil')
+local krprint = require("krprint")
+
+-- Units
+kB = 1024
+MB = 1024*kB
+GB = 1024*MB
+-- Time
+sec = 1000
+second = sec
+minute = 60 * sec
+min = minute
+hour = 60 * minute
+day = 24 * hour
+
+-- Logging
+
+-- from syslog.h
+LOG_CRIT = 2
+LOG_ERR = 3
+LOG_WARNING = 4
+LOG_NOTICE = 5
+LOG_INFO = 6
+LOG_DEBUG = 7
+
+local function curr_file() return debug.getinfo(4,'S').source end
+local function curr_line() return debug.getinfo(4,'l').currentline end
+
+local function log_fmt(grp, level, fmt, ...)
+ ffi.C.kr_log_fmt(grp, level,
+ 'CODE_FILE='..curr_file(), 'CODE_LINE='..curr_line(), 'CODE_FUNC=',
+ '[%-6s] %s\n', ffi.C.kr_log_grp2name(grp), string.format(fmt, ...))
+end
+
+function log_req(req, qry_uid, indent, grp, fmt, ...)
+ ffi.C.kr_log_req1(req, qry_uid, indent, grp, ffi.C.kr_log_grp2name(grp),
+ '%s\n', string.format(fmt, ...))
+end
+
+function log_qry(qry, grp, fmt, ...)
+ ffi.C.kr_log_q1(qry, grp, ffi.C.kr_log_grp2name(grp),
+ '%s\n', string.format(fmt, ...))
+end
+
+function panic(fmt, ...)
+ print(debug.traceback('error occurred here (config filename:lineno is '
+ .. 'at the bottom, if config is involved):', 2))
+ error(string.format('ERROR: '.. fmt, ...), 0)
+end
+
+function log_error(grp, fmt, ...)
+ log_fmt(grp, LOG_ERR, fmt, ...)
+end
+
+function log_warn(grp, fmt, ...)
+ log_fmt(grp, LOG_WARNING, fmt, ...)
+end
+
+function log_notice(grp, fmt, ...)
+ log_fmt(grp, LOG_NOTICE, fmt, ...)
+end
+
+function log_info(grp, fmt, ...)
+ log_fmt(grp, LOG_INFO, fmt, ...)
+end
+
+function log_debug(grp, fmt, ...)
+ log_fmt(grp, LOG_DEBUG, fmt, ...)
+end
+
+function log(fmt, ...)
+ log_notice(ffi.C.LOG_GRP_MODULE, fmt, ...)
+end
+
+-- Resolver bindings
+kres = require('kres')
+if rawget(kres, 'str2dname') ~= nil then
+ todname = kres.str2dname
+end
+
+worker.resolve_pkt = function (pkt, options, finish, init)
+ options = kres.mk_qflags(options)
+ local task = ffi.C.worker_resolve_start(pkt, options)
+
+ -- Deal with finish and init callbacks
+ if finish ~= nil then
+ local finish_cb
+ finish_cb = ffi.cast('trace_callback_f',
+ function (req)
+ jit.off(true, true) -- JIT for (C -> lua)^2 nesting isn't allowed
+ finish(req.answer, req)
+ finish_cb:free()
+ end)
+ task.ctx.req.trace_finish = finish_cb
+ end
+ if init ~= nil then
+ init(task.ctx.req)
+ end
+
+ return ffi.C.worker_resolve_exec(task, pkt) == 0
+end
+
+worker.resolve = function (qname, qtype, qclass, options, finish, init)
+ -- Alternatively use named arguments
+ if type(qname) == 'table' then
+ local t = qname
+ qname = t.name
+ qtype = t.type
+ qclass = t.class
+ options = t.options
+ finish = t.finish
+ init = t.init
+ end
+ qtype = qtype or kres.type.A
+ qclass = qclass or kres.class.IN
+ options = kres.mk_qflags(options)
+ -- LATER: nicer errors for rubbish in qname, qtype, qclass?
+ local pkt = ffi.C.worker_resolve_mk_pkt(qname, qtype, qclass, options)
+ if pkt == nil then
+ panic('failure in worker.resolve(); probably invalid qname "%s"', qname)
+ end
+ local ret = worker.resolve_pkt(pkt, options, finish, init)
+ ffi.C.knot_pkt_free(pkt);
+ return ret
+end
+resolve = worker.resolve
+
+-- Shorthand for aggregated per-worker information
+worker.info = function ()
+ local t = worker.stats()
+ t.pid = worker.pid
+ return t
+end
+
+-- Resolver mode of operation
+local current_mode = 'normal'
+local mode_table = { normal=0, strict=1, permissive=2 }
+function mode(m)
+ if not m then return current_mode end
+ if not mode_table[m] then error('unsupported mode: '..m) end
+ -- Update current operation mode
+ current_mode = m
+ option('STRICT', current_mode == 'strict')
+ option('PERMISSIVE', current_mode == 'permissive')
+ return true
+end
+
+-- Trivial option alias
+function reorder_RR(val)
+ return option('REORDER_RR', val)
+end
+
+-- Get/set resolver options via name (string)
+function option(name, val)
+ local flags = kres.context().options;
+ -- Note: no way to test existence of flags[name] but we want error anyway.
+ name = string.upper(name) -- convenience
+ if val ~= nil then
+ if (val ~= true) and (val ~= false) then
+ panic('invalid option value: ' .. tostring(val))
+ end
+ flags[name] = val;
+ end
+ return flags[name];
+end
+
+-- Function aliases
+-- `env.VAR returns os.getenv(VAR)`
+env = {}
+setmetatable(env, {
+ __index = function (_, k) return os.getenv(k) end
+})
+
+debugging = {}
+setmetatable(debugging, {
+ __index = function(_, k)
+ if k == 'assertion_abort' then return ffi.C.kr_dbg_assertion_abort
+ elseif k == 'assertion_fork' then return ffi.C.kr_dbg_assertion_fork
+ else panic('invalid debugging option: ' .. tostring(k))
+ end
+ end,
+ __newindex = function(_, k, v)
+ if k == 'assertion_abort' then ffi.C.kr_dbg_assertion_abort = v
+ elseif k == 'assertion_fork' then ffi.C.kr_dbg_assertion_fork = v
+ else panic('invalid debugging option: ' .. tostring(k))
+ end
+ end
+})
+
+-- Quick access to interfaces
+-- `net.<iface>` => `net.interfaces()[iface]`
+-- `net = {addr1, ..}` => `net.listen(name, addr1)`
+-- `net.ipv{4,6} = {true, false}` => enable/disable IPv{4,6}
+setmetatable(net, {
+ __index = function (t, k)
+ local v = rawget(t, k)
+ if v then return v
+ elseif k == 'ipv6' then return not option('NO_IPV6')
+ elseif k == 'ipv4' then return not option('NO_IPV4')
+ else return net.interfaces()[k]
+ end
+ end,
+ __newindex = function (t,k,v)
+ if k == 'ipv6' then return option('NO_IPV6', not v)
+ elseif k == 'ipv4' then return option('NO_IPV4', not v)
+ else
+ local iname = rawget(net.interfaces(), v)
+ if iname then t.listen(iname)
+ else t.listen(v)
+ end
+ end
+ end
+})
+
+-- Syntactic sugar for module loading
+-- `modules.<name> = <config>`
+setmetatable(modules, {
+ __newindex = function (_, k, v)
+ if type(k) == 'number' then
+ k, v = v, nil
+ end
+ if not rawget(_G, k) then
+ modules.load(k)
+ k = string.match(k, '[%w_]+')
+ local mod = _G[k]
+ local config = mod and rawget(mod, 'config')
+ if mod ~= nil and config ~= nil then
+ if k ~= v then config(v)
+ else config()
+ end
+ end
+ end
+ end
+})
+
+-- Set up lua table for a C module. (Internal function.)
+function modules_create_table_for_c(kr_module_ud)
+ local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
+ --- Set up the global table named according to the module.
+ if kr_module.config == nil and kr_module.props == nil then
+ return
+ end
+ local module = {}
+ local module_name = ffi.string(kr_module.name)
+ _G[module_name] = module
+
+ --- Construct lua functions for properties.
+ if kr_module.props ~= nil then
+ local i = 0
+ while true do
+ local prop = kr_module.props[i]
+ local cb = prop.cb
+ if cb == nil then break; end
+ module[ffi.string(prop.name)] =
+ function (arg) -- lua wrapper around kr_prop_cb function typedef
+ local arg_conv
+ if type(arg) == 'table' or type(arg) == 'boolean' then
+ arg_conv = tojson(arg)
+ elseif arg ~= nil then
+ arg_conv = tostring(arg)
+ end
+ local ret_cstr = cb(ffi.C.the_worker.engine, kr_module, arg_conv)
+ if ret_cstr == nil then
+ return nil
+ end
+ -- LATER(optim.): superfluous copying
+ local ret_str = ffi.string(ret_cstr)
+ -- This is a bit ugly, but the API is that invalid JSON
+ -- should be just returned as string :-(
+ local status, ret = pcall(fromjson, ret_str)
+ if not status then ret = ret_str end
+ ffi.C.free(ret_cstr)
+ return ret
+ end
+ i = i + 1
+ end
+ end
+
+ --- Construct lua function for config().
+ if kr_module.config ~= nil then
+ module.config =
+ function (arg)
+ local arg_conv
+ if type(arg) == 'table' or type(arg) == 'boolean' then
+ arg_conv = tojson(arg)
+ elseif arg ~= nil then
+ arg_conv = tostring(arg)
+ end
+ return kr_module.config(kr_module, arg_conv)
+ end
+ end
+
+ --- Add syntactic sugar for get() and set() properties.
+ --- That also "catches" any commands like `moduleName.foo = bar`.
+ local m_index, m_newindex
+ local get_f = rawget(module, 'get')
+ if get_f ~= nil then
+ m_index = function (_, key)
+ return get_f(key)
+ end
+ else
+ m_index = function ()
+ error('module ' .. module_name .. ' does not support indexing syntax sugar')
+ end
+ end
+ local set_f = rawget(module, 'set')
+ if set_f ~= nil then
+ m_newindex = function (_, key, value)
+ -- This will produce a nasty error on some non-string parameters.
+ -- Still, we already use it with integer values, e.g. in predict module :-/
+ return set_f(key .. ' ' .. value)
+ end
+ else
+ m_newindex = function ()
+ error('module ' .. module_name .. ' does not support assignment syntax sugar')
+ end
+ end
+ setmetatable(module, {
+ -- note: the two functions only get called for *missing* indices
+ __index = m_index,
+ __newindex = m_newindex,
+ })
+end
+
+local layer_ctx = ffi.C.kr_layer_t_static
+-- Utilities internal for lua layer glue; see ../ffimodule.c
+modules_ffi_layer_wrap1 = function (layer_cb)
+ return layer_cb(layer_ctx.state, layer_ctx.req)
+end
+modules_ffi_layer_wrap2 = function (layer_cb)
+ return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt)
+end
+modules_ffi_layer_wrap_checkout = function (layer_cb)
+ return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt,
+ layer_ctx.dst, layer_ctx.is_stream)
+end
+modules_ffi_wrap_modcb = function (cb, kr_module_ud) -- this one isn't for layer
+ local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
+ return cb(kr_module)
+end
+
+-- Return filesystem size where the cache resides.
+cache.fssize = function ()
+ local path = cache.current_storage or '.'
+ -- As it is now, `path` may or may not include the lmdb:// prefix.
+ if string.sub(path, 1, 7) == 'lmdb://' then
+ path = string.sub(path, 8)
+ end
+ if #path == 0 then
+ path = '.'
+ end
+ local size = tonumber(ffi.C.kr_fssize(path))
+ if size < 0 then
+ panic('cache.fssize(): %s', ffi.string(ffi.C.knot_strerror(size)))
+ else
+ return size
+ end
+end
+
+cache.clear = function (name, exact_name, rr_type, chunk_size, callback, prev_state)
+ if name == nil or (name == '.' and not exact_name) then
+ -- keep same output format as for 'standard' clear
+ local total_count = cache.count()
+ if not cache.clear_everything() then
+ error('unable to clear everything')
+ end
+ return {count = total_count}
+ end
+ -- Check parameters, in order, and set defaults if missing.
+ local dname = kres.str2dname(name)
+ if not dname then error('cache.clear(): incorrect name passed') end
+ if exact_name == nil then exact_name = false end
+ if type(exact_name) ~= 'boolean'
+ then error('cache.clear(): incorrect exact_name passed') end
+
+ local cach = kres.context().cache;
+ local rettable = {}
+ -- Apex warning. If the caller passes a custom callback,
+ -- we assume they are advanced enough not to need the check.
+ -- The point is to avoid repeating the check in each callback iteration.
+ if callback == nil then
+ local apex_array = ffi.new('knot_dname_t *[1]') -- C: dname **apex_array
+ local ret = ffi.C.kr_cache_closest_apex(cach, dname, false, apex_array)
+ if ret < 0 then
+ error(ffi.string(ffi.C.knot_strerror(ret))) end
+ if not ffi.C.knot_dname_is_equal(apex_array[0], dname) then
+ local apex_str = kres.dname2str(apex_array[0])
+ rettable.not_apex = 'to clear proofs of non-existence call '
+ .. 'cache.clear(\'' .. tostring(apex_str) ..'\')'
+ rettable.subtree = apex_str
+ end
+ ffi.C.free(apex_array[0])
+ end
+
+ if rr_type ~= nil then
+ -- Special case, without any subtree searching.
+ if not exact_name
+ then error('cache.clear(): specifying rr_type only supported with exact_name') end
+ if chunk_size or callback
+ then error('cache.clear(): chunk_size and callback parameters not supported with rr_type') end
+ local ret = ffi.C.kr_cache_remove(cach, dname, rr_type)
+ if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end
+ return {count = 1}
+ end
+
+ if chunk_size == nil then chunk_size = 100 end
+ if type(chunk_size) ~= 'number' or chunk_size <= 0
+ then error('cache.clear(): chunk_size has to be a positive integer') end
+
+ -- Do the C call, and add chunk_size warning.
+ rettable.count = ffi.C.kr_cache_remove_subtree(cach, dname, exact_name, chunk_size)
+ if rettable.count == chunk_size then
+ local msg_extra = ''
+ if callback == nil then
+ msg_extra = '; the default callback will continue asynchronously'
+ end
+ rettable.chunk_limit = 'chunk size limit reached' .. msg_extra
+ end
+
+ -- Default callback function: repeat after 1ms
+ if callback == nil then callback =
+ function (cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbprev_state, cbrettable)
+ if cbrettable.count < 0 then error(ffi.string(ffi.C.knot_strerror(cbrettable.count))) end
+ if cbprev_state == nil then cbprev_state = { round = 0 } end
+ if type(cbprev_state) ~= 'table'
+ then error('cache.clear() callback: incorrect prev_state passed') end
+ cbrettable.round = cbprev_state.round + 1
+ if (cbrettable.count == cbchunk_size) then
+ event.after(1, function ()
+ cache.clear(cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbrettable)
+ end)
+ elseif cbrettable.round > 1 then
+ log_info(ffi.C.LOG_GRP_CACHE, 'asynchronous cache.clear(\'' .. cbname .. '\', '
+ .. tostring(cbexact_name) .. ') finished')
+ end
+ return cbrettable
+ end
+ end
+ return callback(name, exact_name, rr_type, chunk_size, callback, prev_state, rettable)
+end
+-- Syntactic sugar for cache
+-- `cache[x] -> cache.get(x)`
+-- `cache.{size|storage} = value`
+setmetatable(cache, {
+ __index = function (t, k)
+ local res = rawget(t, k)
+ if not res and not rawget(t, 'current_size') then return res end
+ -- Beware: t.get returns empty table on failure to find.
+ -- That would be confusing here (breaking kresc), so return nil instead.
+ res = t.get(k)
+ if res and next(res) ~= nil then return res else return nil end
+ end,
+ __newindex = function (t,k,v)
+ -- Defaults
+ local storage = rawget(t, 'current_storage')
+ if not storage then storage = 'lmdb://' end
+ local size = rawget(t, 'current_size')
+ if not size then size = 10*MB end
+ -- Declarative interface for cache
+ if k == 'size' then t.open(v, storage)
+ elseif k == 'storage' then t.open(size, v) end
+ end
+})
+
+-- Make sandboxed environment
+local function make_sandbox(defined)
+ local __protected = {
+ worker = true, env = true, debugging = true, modules = true,
+ cache = true, net = true, trust_anchors = true
+ }
+
+ -- Compute and export the list of top-level names (hidden otherwise)
+ local nl = ""
+ for n in pairs(defined) do
+ nl = nl .. n .. "\n"
+ end
+
+ return setmetatable({ __orig_name_list = nl }, {
+ __index = defined,
+ __newindex = function (_, k, v)
+ if __protected[k] then
+ for k2,v2 in pairs(v) do
+ defined[k][k2] = v2
+ end
+ else
+ defined[k] = v
+ end
+ end
+ })
+end
+
+-- Compatibility sandbox
+_G = make_sandbox(getfenv(0))
+setfenv(0, _G)
+
+-- Load default modules
+trust_anchors = require('trust_anchors')
+modules.load('ta_update')
+modules.load('ta_signal_query')
+modules.load('policy')
+modules.load('priming')
+modules.load('detect_time_skew')
+modules.load('detect_time_jump')
+modules.load('ta_sentinel')
+modules.load('edns_keepalive')
+modules.load('refuse_nord')
+modules.load('watchdog')
+modules.load('extended_error')
+
+-- Load keyfile_default
+trust_anchors.add_file('@keyfile_default@', @unmanaged@)
+
+local function eval_cmd_compile(line, raw)
+ -- Compatibility sandbox code loading
+ local function load_code(code)
+ if getfenv then -- Lua 5.1
+ return loadstring(code)
+ else -- Lua 5.2+
+ return load(code, nil, 't', _ENV)
+ end
+ end
+ local err, chunk
+ chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')')
+ if err then
+ chunk, err = load_code(line)
+ end
+ return chunk, err
+end
+
+-- Interactive command evaluation
+function eval_cmd(line, raw)
+ local chunk, err = eval_cmd_compile(line, raw)
+ if not err then
+ return chunk()
+ else
+ error(err)
+ end
+end
+
+-- Pretty printing
+local pprint = require('krprint').pprint
+function table_print(...)
+ local strs = {}
+ local nargs = select('#', ...)
+ if nargs == 0 then
+ return nil
+ end
+ for n=1,nargs do
+ local arg = select(n, ...)
+ local arg_str = pprint(arg)
+ if nargs > 1 then
+ table.insert(strs, string.format("%s\t-- result # %d", arg_str, n))
+ else
+ table.insert(strs, arg_str)
+ end
+ end
+ return table.concat(strs, '\n')
+end
+
+-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O.
+-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not
+-- block resolver engine while waiting for I/O or timers.
+--
+local has_cqueues, cqueues = pcall(require, 'cqueues')
+if has_cqueues then
+
+ -- Export the asynchronous sleep function
+ worker.sleep = cqueues.sleep
+
+ -- Create metatable for workers to define the API
+ -- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer
+ local asynchronous_worker_mt = {
+ work = function (self)
+ local ok, err, _, co = self.cq:step(0)
+ if not ok then
+ log_warn(ffi.C.LOG_GRP_SYSTEM, '%s error: %s %s', self.name or 'worker', err, debug.traceback(co))
+ end
+ -- Reschedule timeout or create new one
+ local timeout = self.cq:timeout()
+ if timeout then
+ -- Throttle timeouts to avoid too frequent wakeups
+ if timeout == 0 then timeout = 0.00001 end
+ -- Convert from seconds to duration
+ timeout = timeout * sec
+ if not self.next_timeout then
+ self.next_timeout = event.after(timeout, self.on_step)
+ else
+ event.reschedule(self.next_timeout, timeout)
+ end
+ else -- Cancel running timeout when there is no next deadline
+ if self.next_timeout then
+ event.cancel(self.next_timeout)
+ self.next_timeout = nil
+ end
+ end
+ end,
+ wrap = function (self, f)
+ self.cq:wrap(f)
+ end,
+ loop = function (self)
+ self.on_step = function () self:work() end
+ self.event_fd = event.socket(self.cq:pollfd(), self.on_step)
+ end,
+ close = function (self)
+ if self.event_fd then
+ event.cancel(self.event_fd)
+ self.event_fd = nil
+ end
+ end,
+ }
+
+ -- Implement the coroutine worker with cqueues
+ local function worker_new (name)
+ return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt })
+ end
+
+ -- Create a default background worker
+ worker.bg_worker = worker_new('worker.background')
+ worker.bg_worker:loop()
+
+ -- Wrap a function for asynchronous execution
+ function worker.coroutine (f)
+ worker.bg_worker:wrap(f)
+ end
+else
+ -- Disable asynchronous execution
+ local function disabled ()
+ error('Lua library cqueues is required for asynchronous execution (luaJIT requires library for Lua 5.1)')
+ end
+ worker.sleep = disabled
+ worker.map = disabled
+ worker.coroutine = disabled
+ worker.bg_worker = setmetatable({}, { __index = disabled })
+end
+
+-- Global commands for map()
+
+-- must be public because it is called from eval_cmd()
+-- when map() commands are read from control socket
+function _map_luaobj_call_wrapper(cmd)
+ local func = eval_cmd_compile(cmd, true)
+ local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback))
+ local ok, serial = pcall(krprint.serialize_lua, ret, 'error')
+ if not ok then
+ log_error(ffi.C.LOG_GRP_SYSTEM, 'failed to serialize map() response %s (%s)',
+ table_print(ret), serial)
+ return krprint.serialize_lua(
+ kluautil.kr_table_pack(false, "returned values cannot be serialized: "
+ .. serial))
+ else
+ return serial
+ end
+end
+
+local function _sock_errmsg(path, desc)
+ return string.format(
+ 'map() error while communicating with %s: %s',
+ path, desc)
+end
+
+local function _sock_check(sock, call, params, path, desc)
+ local errprefix = _sock_errmsg(path, desc) .. ': '
+ local retvals = kluautil.kr_table_pack(pcall(call, unpack(params)))
+ local ok = retvals[1]
+ if not ok then
+ error(errprefix .. tostring(retvals[2]))
+ end
+ local rerr, werr = sock:error()
+ if rerr or werr then
+ error(string.format('%sread error %s; write error %s', errprefix, rerr, werr))
+ end
+ if retvals[2] == nil then
+ error(errprefix .. 'unexpected nil result')
+ end
+ return unpack(retvals, 2, retvals.n)
+end
+
+local function _sock_assert(condition, path, desc)
+ if not condition then
+ error(_sock_errmsg(path, desc))
+ end
+end
+
+local function map_send_recv(cmd, path)
+ local bit = require('bit')
+ local socket = require('cqueues.socket')
+ local s = socket.connect({ path = path })
+ s:setmaxerrs(0)
+ s:setmode('bn', 'bn')
+ local status, err = pcall(s.connect, s)
+ if not status then
+ log_error(ffi.C.LOG_GRP_NETWORK, 'map() error while connecting to control socket %s: '
+ .. '%s (ignoring this socket)', path, err)
+ return nil
+ end
+ local ret = _sock_check(s, s.write, {s, '__binary\n'}, path,
+ 'write __binary')
+ _sock_assert(ret, path,
+ 'write __binary result')
+ local recv = _sock_check(s, s.read, {s, 2}, path,
+ 'read reply to __binary')
+ _sock_assert(recv and recv == '> ', path,
+ 'unexpected reply to __binary')
+ _sock_check(s, s.write, {s, cmd..'\n'}, path,
+ 'command write')
+ recv = _sock_check(s, s.read, {s, 4}, path,
+ 'response length read')
+ _sock_assert(recv and #recv == 4, path,
+ 'length of response length preamble does not match')
+ local len = tonumber(recv:byte(1))
+ for i=2,4 do
+ len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
+ end
+ ret = _sock_check(s, s.read, {s, len}, path,
+ 'read response')
+ _sock_assert(ret and #ret == len, path,
+ 'actual response length does not match length in preamble')
+ s:close()
+ return ret
+end
+
+-- internal use only
+-- Call cmd on each instance via control sockets.
+-- @param format - "luaobj" if individual results should be Lua objects
+-- - "strings" for eval_cmd output for each instance
+-- @returns table with results, one item per instance + key n=number of instances
+-- (order of return values is undefined)
+-- @throws Lua error if:
+-- - communication failed in the middle of transaction
+-- - a result is not serializable
+-- - individual call throws an error
+-- - number of return values != 1 per instance per call
+-- - cmd execution state is undefined after an error
+-- Connection errors at the beginning are ignored to paper over leftover dead sockets.
+function map(cmd, format)
+ local local_sockets = {}
+ local results = {}
+
+ if (type(cmd) ~= 'string') then
+ panic('map() command must be a string') end
+ if string.find(cmd, '\n', 1, true) then
+ panic('map() command cannot contain literal \\n, escape it with \\010') end
+ if (#cmd <= 0) then
+ panic('map() command must be non-empty') end
+ -- syntax check on input command to detect typos early
+ local chunk, err = eval_cmd_compile(cmd, false)
+ if not chunk then
+ panic('failure when compiling map() command: %s', err)
+ end
+
+ format = format or 'luaobj'
+ if (format ~= 'luaobj' and format ~= 'strings') then
+ panic('map() output format must be luaobj or strings') end
+ if format == 'luaobj' then
+ cmd = '_map_luaobj_call_wrapper([=====[' .. cmd .. ']=====])'
+ end
+
+ -- find out control socket paths
+ for _,v in pairs(net.list()) do
+ if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
+ table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
+ end
+ end
+ local filetab = kluautil.list_dir(worker.control_path)
+ if next(filetab) == nil then
+ panic('no control sockets found in directory %s',
+ worker.control_path)
+ end
+
+ local result_count = 0
+ -- finally execute it on all instances
+ for _, file in ipairs(filetab) do
+ local local_exec = false
+ for _, lsoc in ipairs(local_sockets) do
+ if file == lsoc then
+ local_exec = true
+ end
+ end
+ local path = worker.control_path..file
+ local path_name = (local_exec and 'this instance') or path
+ log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd)
+ local ret
+ if local_exec then
+ ret = eval_cmd(cmd)
+ else
+ ret = map_send_recv(cmd, path)
+ -- skip dead sockets (leftovers from dead instances)
+ if ret == nil then
+ goto continue
+ end
+ end
+ result_count = result_count + 1
+ -- return value is output from eval_cmd
+ -- i.e. string including "quotes" and Lua escaping in between
+ assert(type(ret) == 'string', 'map() protocol error, '
+ .. 'string not retured by follower')
+ assert(#ret >= 2 and
+ string.sub(ret, 1, 1) == "'"
+ and string.sub(ret, -1, -1) == "'",
+ 'map() protocol error, value returned by follower does '
+ .. 'not look like a string')
+ -- deserialize string: remove "quotes" and de-escape bytes
+ ret = krprint.deserialize_lua(ret)
+ if format == 'luaobj' then
+ -- ret should be table with xpcall results serialized into string
+ ret = krprint.deserialize_lua(ret)
+ assert(type(ret) == 'table', 'map() protocol error, '
+ .. 'table with results not retured by follower')
+ if (ret.n ~= 2) then
+ log_error(ffi.C.LOG_GRP_SYSTEM, 'got unsupported map() response: %s', table_print(ret))
+ panic('unexpected number of return values in map() response: '
+ .. 'only single return value is allowed, '
+ .. 'use kluautil.kr_table_pack() helper')
+ end
+ local ok, retval = ret[1], ret[2]
+ if ok == false then
+ panic('error when executing map() command on control socket %s: '
+ .. '%s. command execution state is now undefined!',
+ path, retval)
+ end
+ -- drop wrapper table and return only the actual return value
+ ret = retval
+ end
+ results[result_count] = ret
+ ::continue::
+ end
+ results.n = result_count
+ return results
+end