1
0
Fork 0
knot-resolver/modules/predict/predict.lua
Daniel Baumann fbc604e215
Adding upstream version 5.7.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:56:17 +02:00

189 lines
5.4 KiB
Lua

-- SPDX-License-Identifier: GPL-3.0-or-later
-- Speculative prefetching for repetitive and soon-expiring records to reduce latency.
-- @module predict
-- @field queue queue of scheduled queries
-- @field queue_len number of scheduled queries
-- @field period length of prediction history (number of windows)
-- @field window length of the prediction window
local predict = {
queue = {},
queue_len = 0,
batch = 0,
period = 24,
window = 15,
log = {},
}
-- Calculate next sample with jitter [1-2/5 of window]
local function next_event()
local jitter = (predict.window * minute) / 5;
return math.random(jitter, 2 * jitter)
end
-- Calculate current epoch (which window fits current time)
function predict.epoch()
if not predict.period or predict.period <= 1 then return nil end
return (os.date('%H')*(60/predict.window) +
math.floor(os.date('%M')/predict.window)) % predict.period + 1
end
-- Resolve queued records and flush the queue
function predict.drain()
local deleted = 0
for key, _ in pairs(predict.queue) do
local qtype, qname = key:match('(%S*)%s(.*)')
resolve(qname, kres.type[qtype], kres.class.IN, {'NO_CACHE'})
predict.queue[key] = nil
deleted = deleted + 1
-- Resolve smaller batches at a time
if predict.batch > 0 and deleted >= predict.batch then
break
end
end
-- Schedule prefetch of another batch if not complete
if predict.ev_drain then event.cancel(predict.ev_drain) end
predict.ev_drain = nil
if deleted > 0 then
predict.ev_drain = event.after((predict.window * 3) * sec, predict.drain)
end
predict.queue_len = predict.queue_len - deleted
stats['predict.queue'] = predict.queue_len
collectgarbage('step')
return 0
end
-- Enqueue queries from same format as predict.queue or predict.log
local function enqueue_from_log(current)
if not current then return 0 end
local queued = 0
for key, val in pairs(current) do
if val and not predict.queue[key] then
predict.queue[key] = val
queued = queued + 1
end
end
return queued
end
-- Sample current epoch, return number of sampled queries
function predict.sample(epoch_now)
if not epoch_now then return 0, 0 end
local current = predict.log[epoch_now] or {}
local queries = stats.frequent()
stats.clear_frequent()
local nr_samples = #queries
for i = 1, nr_samples do
local entry = queries[i]
local key = string.format('%s %s', entry.type, entry.name)
current[key] = 1
end
predict.log[epoch_now] = current
return nr_samples
end
-- Predict queries for the upcoming epoch
local function generate(epoch_now)
if not epoch_now then return 0 end
local queued = 0
for i = 1, predict.period / 2 - 1 do
local current = predict.log[(epoch_now - i - 1) % predict.period + 1]
local past = predict.log[(epoch_now - 2*i - 1) % predict.period + 1]
if current and past then
for k, _ in pairs(current) do
if past[k] ~= nil and not predict.queue[k] then
queued = queued + 1
predict.queue[k] = 1
end
end
end
end
return queued
end
function predict.process()
-- Start a new epoch, or continue sampling
local epoch_now = predict.epoch()
local nr_queued = 0
-- End of epoch
if predict.current_epoch ~= epoch_now then
stats['predict.epoch'] = epoch_now
predict.current_epoch = epoch_now
-- enqueue records from upcoming epoch
nr_queued = enqueue_from_log(predict.log[epoch_now])
-- predict next epoch
nr_queued = nr_queued + generate(epoch_now)
-- clear log for new epoch
predict.log[epoch_now] = {}
end
-- Sample current epoch
local nr_learned = predict.sample(epoch_now)
-- Dispatch predicted queries
if nr_queued > 0 then
predict.queue_len = predict.queue_len + nr_queued
predict.batch = predict.queue_len / 5
if not predict.ev_drain then
predict.ev_drain = event.after(0, predict.drain)
end
end
if predict.ev_sample then event.cancel(predict.ev_sample) end
predict.ev_sample = event.after(next_event(), predict.process)
if stats then
stats['predict.queue'] = predict.queue_len
stats['predict.learned'] = nr_learned
end
collectgarbage()
end
function predict.init()
if predict.window > 0 and predict.period > 0 then
predict.current_epoch = predict.epoch()
predict.ev_sample = event.after(next_event(), predict.process)
end
end
function predict.deinit()
if predict.ev_sample then event.cancel(predict.ev_sample) end
if predict.ev_drain then event.cancel(predict.ev_drain) end
predict.ev_sample = nil
predict.ev_drain = nil
predict.log = {}
predict.queue = {}
predict.queue_len = 0
collectgarbage()
end
function predict.config(config)
-- Reconfigure
config = config or {}
if type(config) ~= 'table' then
error('[predict] configuration must be a table or nil')
end
if config.window then predict.window = config.window end
if config.period then predict.period = config.period end
-- Load dependent modules
if (predict.period or 0) ~= 0 and not stats then modules.load('stats') end
-- Reinitialize to reset timers
predict.deinit()
predict.init()
end
predict.layer = {
-- Prefetch all expiring (sub-)queries immediately after the request finishes.
-- Doing that immediately is simplest and avoids creating (new) large bursts of activity.
finish = function (_, req)
local qrys = req.rplan.resolved
for i = 0, (tonumber(qrys.len) - 1) do -- size_t doesn't work for some reason
local qry = qrys.at[i]
if qry.flags.EXPIRING == true then
resolve(kres.dname2str(qry.sname), qry.stype, qry.sclass, {'NO_CACHE'})
end
end
end
}
return predict