diff options
Diffstat (limited to 'lualib/rspamadm/fuzzy_convert.lua')
-rw-r--r-- | lualib/rspamadm/fuzzy_convert.lua | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/lualib/rspamadm/fuzzy_convert.lua b/lualib/rspamadm/fuzzy_convert.lua new file mode 100644 index 0000000..fab3995 --- /dev/null +++ b/lualib/rspamadm/fuzzy_convert.lua @@ -0,0 +1,208 @@ +local sqlite3 = require "rspamd_sqlite3" +local redis = require "rspamd_redis" +local util = require "rspamd_util" + +local function connect_redis(server, username, password, db) + local ret + local conn, err = redis.connect_sync({ + host = server, + }) + + if not conn then + return nil, 'Cannot connect: ' .. err + end + + if username then + if password then + ret = conn:add_cmd('AUTH', { username, password }) + if not ret then + return nil, 'Cannot queue command' + end + else + return nil, 'Redis requires a password when username is supplied' + end + elseif password then + ret = conn:add_cmd('AUTH', { password }) + if not ret then + return nil, 'Cannot queue command' + end + end + if db then + ret = conn:add_cmd('SELECT', { db }) + if not ret then + return nil, 'Cannot queue command' + end + end + + return conn, nil +end + +local function send_digests(digests, redis_host, redis_username, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_username, redis_password, redis_db) + if err then + print(err) + return false + end + local ret + for _, v in ipairs(digests) do + ret = conn:add_cmd('HMSET', { + 'fuzzy' .. v[1], + 'F', v[2], + 'V', v[3], + }) + if not ret then + print('Cannot batch command') + return false + end + ret = conn:add_cmd('EXPIRE', { + 'fuzzy' .. v[1], + tostring(v[4]), + }) + if not ret then + print('Cannot batch command') + return false + end + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +local function send_shingles(shingles, redis_host, redis_username, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_username, redis_password, redis_db) + if err then + print("Redis error: " .. err) + return false + end + local ret + for _, v in ipairs(shingles) do + ret = conn:add_cmd('SET', { + 'fuzzy_' .. v[2] .. '_' .. v[1], + v[4], + }) + if not ret then + print('Cannot batch SET command: ' .. err) + return false + end + ret = conn:add_cmd('EXPIRE', { + 'fuzzy_' .. v[2] .. '_' .. v[1], + tostring(v[3]), + }) + if not ret then + print('Cannot batch command') + return false + end + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +local function update_counters(total, redis_host, redis_username, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_username, redis_password, redis_db) + if err then + print(err) + return false + end + local ret + ret = conn:add_cmd('SET', { + 'fuzzylocal', + total, + }) + if not ret then + print('Cannot batch command') + return false + end + ret = conn:add_cmd('SET', { + 'fuzzy_count', + total, + }) + if not ret then + print('Cannot batch command') + return false + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +return function(_, res) + local db = sqlite3.open(res['source_db']) + local shingles = {} + local digests = {} + local num_batch_digests = 0 + local num_batch_shingles = 0 + local total_digests = 0 + local total_shingles = 0 + local lim_batch = 1000 -- Update each 1000 entries + local redis_username = res['redis_username'] + local redis_password = res['redis_password'] + local redis_db = nil + + if res['redis_db'] then + redis_db = tostring(res['redis_db']) + end + + if not db then + print('Cannot open source db: ' .. res['source_db']) + return + end + + local now = util.get_time() + for row in db:rows('SELECT id, flag, digest, value, time FROM digests') do + + local expire_in = math.floor(now - row.time + res['expiry']) + if expire_in >= 1 then + table.insert(digests, { row.digest, row.flag, row.value, expire_in }) + num_batch_digests = num_batch_digests + 1 + total_digests = total_digests + 1 + for srow in db:rows('SELECT value, number FROM shingles WHERE digest_id = ' .. row.id) do + table.insert(shingles, { srow.value, srow.number, expire_in, row.digest }) + total_shingles = total_shingles + 1 + num_batch_shingles = num_batch_shingles + 1 + end + end + if num_batch_digests >= lim_batch then + if not send_digests(digests, res['redis_host'], redis_username, redis_password, redis_db) then + return + end + num_batch_digests = 0 + digests = {} + end + if num_batch_shingles >= lim_batch then + if not send_shingles(shingles, res['redis_host'], redis_username, redis_password, redis_db) then + return + end + num_batch_shingles = 0 + shingles = {} + end + end + if digests[1] then + if not send_digests(digests, res['redis_host'], redis_username, redis_password, redis_db) then + return + end + end + if shingles[1] then + if not send_shingles(shingles, res['redis_host'], redis_username, redis_password, redis_db) then + return + end + end + + local message = string.format( + 'Migrated %d digests and %d shingles', + total_digests, total_shingles + ) + if not update_counters(total_digests, res['redis_host'], redis_username, redis_password, redis_db) then + message = message .. ' but failed to update counters' + end + print(message) +end |