summaryrefslogtreecommitdiffstats
path: root/modules/predict
diff options
context:
space:
mode:
Diffstat (limited to 'modules/predict')
-rw-r--r--modules/predict/README.rst51
-rw-r--r--modules/predict/predict.lua186
-rw-r--r--modules/predict/predict.mk2
-rw-r--r--modules/predict/tests/predict.test.lua60
4 files changed, 299 insertions, 0 deletions
diff --git a/modules/predict/README.rst b/modules/predict/README.rst
new file mode 100644
index 0000000..6c4b442
--- /dev/null
+++ b/modules/predict/README.rst
@@ -0,0 +1,51 @@
+.. _mod-predict:
+
+Prefetching records
+-------------------
+
+The module refreshes records that are about to expire when they're used (having less than 1% of original TTL).
+This improves latency for frequently used records, as they are fetched in advance.
+
+It is also able to learn usage patterns and repetitive queries that the server makes. For example, if
+it makes a query every day at 18:00, the resolver expects that it is needed by that time and prefetches it
+ahead of time. This is helpful to minimize the perceived latency and keeps the cache hot.
+
+.. tip:: The tracking window and period length determine memory requirements. If you have a server with relatively fast query turnover, keep the period low (hour for start) and shorter tracking window (5 minutes). For personal slower resolver, keep the tracking window longer (i.e. 30 minutes) and period longer (a day), as the habitual queries occur daily. Experiment to get the best results.
+
+Example configuration
+^^^^^^^^^^^^^^^^^^^^^
+
+.. code-block:: lua
+
+ modules = {
+ predict = {
+ window = 15, -- 15 minutes sampling window
+ period = 6*(60/15) -- track last 6 hours
+ }
+ }
+
+Defaults are 15 minutes window, 6 hours period.
+
+.. tip:: Use period 0 to turn off prediction and just do prefetching of expiring records.
+ That works even without the :ref:`stats <mod-stats>` module.
+
+.. note:: Otherwise this module requires :ref:`stats <mod-stats>` module and loads it if not present.
+
+Exported metrics
+^^^^^^^^^^^^^^^^
+
+To visualize the efficiency of the predictions, the module exports following statistics.
+
+* ``predict.epoch`` - current prediction epoch (based on time of day and sampling window)
+* ``predict.queue`` - number of queued queries in current window
+* ``predict.learned`` - number of learned queries in current window
+
+
+Properties
+^^^^^^^^^^
+
+.. function:: predict.config({ window = 15, period = 24})
+
+ Reconfigure the predictor to given tracking window and period length. Both parameters are optional.
+ Window length is in minutes, period is a number of windows that can be kept in memory.
+ e.g. if a ``window`` is 15 minutes, a ``period`` of "24" means 6 hours.
diff --git a/modules/predict/predict.lua b/modules/predict/predict.lua
new file mode 100644
index 0000000..846d4ef
--- /dev/null
+++ b/modules/predict/predict.lua
@@ -0,0 +1,186 @@
+-- Speculative prefetching for repetitive and soon-expiring records to reduce latency.
+-- @module predict
+-- @field queue queue of scheduled queries
+-- @field queue_len number of scheduled queries
+-- @field period length of prediction history (number of windows)
+-- @field window length of the prediction window
+local predict = {
+ queue = {},
+ queue_len = 0,
+ batch = 0,
+ period = 24,
+ window = 15,
+ log = {},
+}
+
+
+-- Calculate next sample with jitter [1-2/5 of window]
+local function next_event()
+ local jitter = (predict.window * minute) / 5;
+ return math.random(jitter, 2 * jitter)
+end
+
+-- Calculate current epoch (which window fits current time)
+function predict.epoch()
+ if not predict.period or predict.period <= 1 then return nil end
+ return (os.date('%H')*(60/predict.window) +
+ math.floor(os.date('%M')/predict.window)) % predict.period + 1
+end
+
+-- Resolve queued records and flush the queue
+function predict.drain()
+ local deleted = 0
+ for key, _ in pairs(predict.queue) do
+ local qtype, qname = key:match('(%S*)%s(.*)')
+ resolve(qname, kres.type[qtype], kres.class.IN, {'NO_CACHE'})
+ predict.queue[key] = nil
+ deleted = deleted + 1
+ -- Resolve smaller batches at a time
+ if predict.batch > 0 and deleted >= predict.batch then
+ break
+ end
+ end
+ -- Schedule prefetch of another batch if not complete
+ if predict.ev_drain then event.cancel(predict.ev_drain) end
+ predict.ev_drain = nil
+ if deleted > 0 then
+ predict.ev_drain = event.after((predict.window * 3) * sec, predict.drain)
+ end
+ predict.queue_len = predict.queue_len - deleted
+ stats['predict.queue'] = predict.queue_len
+ collectgarbage('step')
+ return 0
+end
+
+-- Enqueue queries from same format as predict.queue or predict.log
+local function enqueue_from_log(current)
+ if not current then return 0 end
+ local queued = 0
+ for key, val in pairs(current) do
+ if val and not predict.queue[key] then
+ predict.queue[key] = val
+ queued = queued + 1
+ end
+ end
+ return queued
+end
+
+-- Sample current epoch, return number of sampled queries
+function predict.sample(epoch_now)
+ if not epoch_now then return 0, 0 end
+ local current = predict.log[epoch_now] or {}
+ local queries = stats.frequent()
+ stats.clear_frequent()
+ local nr_samples = #queries
+ for i = 1, nr_samples do
+ local entry = queries[i]
+ local key = string.format('%s %s', entry.type, entry.name)
+ current[key] = 1
+ end
+ predict.log[epoch_now] = current
+ return nr_samples
+end
+
+-- Predict queries for the upcoming epoch
+local function generate(epoch_now)
+ if not epoch_now then return 0 end
+ local queued = 0
+ for i = 1, predict.period / 2 - 1 do
+ local current = predict.log[(epoch_now - i - 1) % predict.period + 1]
+ local past = predict.log[(epoch_now - 2*i - 1) % predict.period + 1]
+ if current and past then
+ for k, _ in pairs(current) do
+ if past[k] ~= nil and not predict.queue[k] then
+ queued = queued + 1
+ predict.queue[k] = 1
+ end
+ end
+ end
+ end
+ return queued
+end
+
+function predict.process()
+ -- Start a new epoch, or continue sampling
+ local epoch_now = predict.epoch()
+ local nr_queued = 0
+
+ -- End of epoch
+ if predict.current_epoch ~= epoch_now then
+ stats['predict.epoch'] = epoch_now
+ predict.current_epoch = epoch_now
+ -- enqueue records from upcoming epoch
+ nr_queued = enqueue_from_log(predict.log[epoch_now])
+ -- predict next epoch
+ nr_queued = nr_queued + generate(epoch_now)
+ -- clear log for new epoch
+ predict.log[epoch_now] = {}
+ end
+
+ -- Sample current epoch
+ local nr_learned = predict.sample(epoch_now)
+
+ -- Dispatch predicted queries
+ if nr_queued > 0 then
+ predict.queue_len = predict.queue_len + nr_queued
+ predict.batch = predict.queue_len / 5
+ if not predict.ev_drain then
+ predict.ev_drain = event.after(0, predict.drain)
+ end
+ end
+
+ if predict.ev_sample then event.cancel(predict.ev_sample) end
+ predict.ev_sample = event.after(next_event(), predict.process)
+ if stats then
+ stats['predict.queue'] = predict.queue_len
+ stats['predict.learned'] = nr_learned
+ end
+ collectgarbage()
+end
+
+function predict.init()
+ if predict.window > 0 then
+ predict.current_epoch = predict.epoch()
+ predict.ev_sample = event.after(next_event(), predict.process)
+ end
+end
+
+function predict.deinit()
+ if predict.ev_sample then event.cancel(predict.ev_sample) end
+ if predict.ev_drain then event.cancel(predict.ev_drain) end
+ predict.ev_sample = nil
+ predict.ev_drain = nil
+ predict.log = {}
+ predict.queue = {}
+ predict.queue_len = 0
+ collectgarbage()
+end
+
+function predict.config(config)
+ -- Reconfigure
+ if type(config) ~= 'table' then return end
+ if config.window then predict.window = config.window end
+ if config.period then predict.period = config.period end
+ -- Load dependent modules
+ if (predict.period or 0) ~= 0 and not stats then modules.load('stats') end
+ -- Reinitialize to reset timers
+ predict.deinit()
+ predict.init()
+end
+
+predict.layer = {
+ -- Prefetch all expiring (sub-)queries immediately after the request finishes.
+ -- Doing that immediately is simplest and avoids creating (new) large bursts of activity.
+ finish = function (_, req)
+ req = kres.request_t(req)
+ local qrys = req.rplan.resolved
+ for i = 0, (tonumber(qrys.len) - 1) do -- size_t doesn't work for some reason
+ local qry = qrys.at[i]
+ if qry.flags.EXPIRING == true then
+ resolve(kres.dname2str(qry.sname), qry.stype, qry.sclass, {'NO_CACHE'})
+ end
+ end
+ end
+}
+
+return predict
diff --git a/modules/predict/predict.mk b/modules/predict/predict.mk
new file mode 100644
index 0000000..51a1a6b
--- /dev/null
+++ b/modules/predict/predict.mk
@@ -0,0 +1,2 @@
+predict_SOURCES := predict.lua
+$(call make_lua_module,predict)
diff --git a/modules/predict/tests/predict.test.lua b/modules/predict/tests/predict.test.lua
new file mode 100644
index 0000000..1043104
--- /dev/null
+++ b/modules/predict/tests/predict.test.lua
@@ -0,0 +1,60 @@
+-- setup resolver
+modules = { 'predict', 'stats' }
+
+-- mock global functions
+local resolve_count = 0
+local current_epoch = 0
+
+resolve = function ()
+ resolve_count = resolve_count + 1
+end
+
+stats.frequent = function ()
+ return {
+ {name = 'example.com', type = 'TYPE65535'},
+ {name = 'example.com', type = 'SOA'},
+ }
+end
+
+predict.epoch = function ()
+ return current_epoch % predict.period + 1
+end
+
+-- test if draining of prefetch queue works
+local function test_predict_drain()
+ resolve_count = 0
+ predict.queue_len = 2
+ predict.queue['TYPE65535 example.com'] = 1
+ predict.queue['SOA example.com'] = 1
+ predict.drain()
+ -- test that it attempted to prefetch
+ same(resolve_count, 2, 'attempted to prefetch on drain')
+ same(predict.queue_len, 0, 'prefetch queue empty after drain')
+end
+
+-- test if prediction process works
+local function test_predict_process()
+ -- start new epoch
+ predict.process()
+ same(predict.queue_len, 0, 'first epoch, empty prefetch queue')
+ -- next epoch, still no period for frequent queries
+ current_epoch = current_epoch + 1
+ predict.process()
+ same(predict.queue_len, 0, 'second epoch, empty prefetch queue')
+ -- next epoch, found period
+ current_epoch = current_epoch + 1
+ predict.process()
+ same(predict.queue_len, 2, 'third epoch, prefetching')
+ -- drain works with scheduled prefetches (two batches)
+ resolve_count = 0
+ predict.drain()
+ predict.drain()
+ same(resolve_count, 2, 'attempted to resolve queries in queue')
+ same(predict.queue_len, 0, 'prefetch queue is empty')
+end
+
+-- return test set
+return {
+ test_predict_drain,
+ test_predict_process
+}