diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
commit | 133a45c109da5310add55824db21af5239951f93 (patch) | |
tree | ba6ac4c0a950a0dda56451944315d66409923918 /src/plugins/lua/elastic.lua | |
parent | Initial commit. (diff) | |
download | rspamd-133a45c109da5310add55824db21af5239951f93.tar.xz rspamd-133a45c109da5310add55824db21af5239951f93.zip |
Adding upstream version 3.8.1.upstream/3.8.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/plugins/lua/elastic.lua')
-rw-r--r-- | src/plugins/lua/elastic.lua | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua new file mode 100644 index 0000000..ccbb7c1 --- /dev/null +++ b/src/plugins/lua/elastic.lua @@ -0,0 +1,544 @@ +--[[ +Copyright (c) 2017, Veselin Iordanov +Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local rspamd_logger = require 'rspamd_logger' +local rspamd_http = require "rspamd_http" +local lua_util = require "lua_util" +local util = require "rspamd_util" +local ucl = require "ucl" +local rspamd_redis = require "lua_redis" +local upstream_list = require "rspamd_upstream_list" +local lua_settings = require "lua_settings" + +if confighelp then + return +end + +local rows = {} +local nrows = 0 +local failed_sends = 0 +local elastic_template +local redis_params +local N = "elastic" +local E = {} +local HOSTNAME = util.get_hostname() +local connect_prefix = 'http://' +local enabled = true +local ingest_geoip_type = 'plugins' +local settings = { + limit = 500, + index_pattern = 'rspamd-%Y.%m.%d', + template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json', + kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json', + key_prefix = 'elastic-', + expire = 3600, + timeout = 5.0, + failover = false, + import_kibana = false, + use_https = false, + use_gzip = true, + allow_local = false, + user = nil, + password = nil, + no_ssl_verify = false, + max_fail = 3, + ingest_module = false, + elasticsearch_version = 6, +} + +local function read_file(path) + local file = io.open(path, "rb") + if not file then + return nil + end + local content = file:read "*a" + file:close() + return content +end + +local function elastic_send_data(task) + local es_index = os.date(settings['index_pattern']) + local tbl = {} + for _, value in pairs(rows) do + if settings.elasticsearch_version >= 7 then + table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. + '","pipeline": "rspamd-geoip"} }') + else + table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. + '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }') + end + table.insert(tbl, ucl.to_format(value, 'json-compact')) + end + + table.insert(tbl, '') -- For last \n + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk' + local bulk_json = table.concat(tbl, "\n") + + local function http_callback(err, code, _, _) + if err then + rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s", + push_url, err, failed_sends, settings.max_fail) + else + if code ~= 200 then + rspamd_logger.infox(task, + "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s", + push_url, err, code, failed_sends, settings.max_fail) + else + lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES", + nrows, #bulk_json) + end + end + end + + return rspamd_http.request({ + url = push_url, + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + callback = http_callback, + task = task, + method = 'post', + gzip = settings.use_gzip, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) +end + +local function get_general_metadata(task) + local r = {} + local ip_addr = task:get_ip() + + if ip_addr and ip_addr:is_valid() then + r.is_local = ip_addr:is_local() + r.ip = tostring(ip_addr) + else + r.ip = '127.0.0.1' + end + + r.webmail = false + r.sender_ip = 'unknown' + local origin = task:get_header('X-Originating-IP') + if origin then + origin = origin:gsub('%[', ''):gsub('%]', '') + local rspamd_ip = require "rspamd_ip" + local origin_ip = rspamd_ip.from_string(origin) + if origin_ip and origin_ip:is_valid() then + r.webmail = true + r.sender_ip = origin -- use string here + end + end + + r.direction = "Inbound" + r.user = task:get_user() or 'unknown' + r.qid = task:get_queue_id() or 'unknown' + r.action = task:get_metric_action() + r.rspamd_server = HOSTNAME + if r.user ~= 'unknown' then + r.direction = "Outbound" + end + local s = task:get_metric_score()[1] + r.score = s + + local rcpt = task:get_recipients('smtp') + if rcpt then + local l = {} + for _, a in ipairs(rcpt) do + table.insert(l, a['addr']) + end + r.rcpt = l + else + r.rcpt = 'unknown' + end + + local from = task:get_from { 'smtp', 'orig' } + if ((from or E)[1] or E).addr then + r.from = from[1].addr + else + r.from = 'unknown' + end + + local mime_from = task:get_from { 'mime', 'orig' } + if ((mime_from or E)[1] or E).addr then + r.mime_from = mime_from[1].addr + else + r.mime_from = 'unknown' + end + + local syminf = task:get_symbols_all() + r.symbols = syminf + r.asn = {} + local pool = task:get_mempool() + r.asn.country = pool:get_variable("country") or 'unknown' + r.asn.asn = pool:get_variable("asn") or 0 + r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' + + local function process_header(name) + local hdr = task:get_header_full(name) + if hdr then + local l = {} + for _, h in ipairs(hdr) do + table.insert(l, h.decoded) + end + return l + else + return 'unknown' + end + end + + r.header_from = process_header('from') + r.header_to = process_header('to') + r.header_subject = process_header('subject') + r.header_date = process_header('date') + r.message_id = task:get_message_id() + local hname = task:get_hostname() or 'unknown' + r.hostname = hname + + local settings_id = task:get_settings_id() + + if settings_id then + -- Convert to string + settings_id = lua_settings.settings_by_id(settings_id) + + if settings_id then + settings_id = settings_id.name + end + end + + if not settings_id then + settings_id = '' + end + + r.settings_id = settings_id + + local scan_real = task:get_scan_time() + scan_real = math.floor(scan_real * 1000) + if scan_real < 0 then + rspamd_logger.messagex(task, + 'clock skew detected for message: %s ms real scan time (reset to 0)', + scan_real) + scan_real = 0 + end + + r.scan_time = scan_real + + return r +end + +local function elastic_collect(task) + if not enabled then + return + end + if task:has_flag('skip') then + return + end + if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then + return + end + + local row = { ['rspamd_meta'] = get_general_metadata(task), + ['@timestamp'] = tostring(util.get_time() * 1000) } + table.insert(rows, row) + nrows = nrows + 1 + if nrows > settings['limit'] then + lua_util.debugm(N, task, 'send elastic search rows: %s', nrows) + if elastic_send_data(task) then + nrows = 0 + rows = {} + failed_sends = 0; + else + failed_sends = failed_sends + 1 + + if failed_sends > settings.max_fail then + rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying', + nrows, failed_sends) + nrows = 0 + rows = {} + failed_sends = 0; + end + end + end +end + +local opts = rspamd_config:get_all_opt('elastic') + +local function check_elastic_server(cfg, ev_base, _) + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type + local function http_callback(err, code, body, _) + if code == 200 then + local parser = ucl.parser() + local res, ucl_err = parser:parse_string(body) + if not res then + rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s', + plugins_url, ucl_err) + enabled = false; + return + end + local obj = parser:get_object() + for node, value in pairs(obj['nodes']) do + local plugin_found = false + for _, plugin in pairs(value['plugins']) do + if plugin['name'] == 'ingest-geoip' then + plugin_found = true + lua_util.debugm(N, "ingest-geoip plugin has been found") + end + end + if not plugin_found then + rspamd_logger.infox(rspamd_config, + 'Unable to find ingest-geoip on %1 node, disabling module', node) + enabled = false + return + end + end + else + rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url, + err, code, body) + enabled = false + end + end + rspamd_http.request({ + url = plugins_url, + ev_base = ev_base, + config = cfg, + method = 'get', + callback = http_callback, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) +end + +-- import ingest pipeline and kibana dashboard/visualization +local function initial_setup(cfg, ev_base, worker) + if not worker:is_primary_controller() then + return + end + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local function push_kibana_template() + -- add kibana dashboard and visualizations + if settings['import_kibana'] then + local kibana_mappings = read_file(settings['kibana_file']) + if kibana_mappings then + local parser = ucl.parser() + local res, parser_err = parser:parse_string(kibana_mappings) + if not res then + rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s', + parser_err) + enabled = false + + return + end + local obj = parser:get_object() + local tbl = {} + for _, item in ipairs(obj) do + table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' .. + item['_type'] .. ':' .. item["_id"] .. '"} }') + table.insert(tbl, ucl.to_format(item['_source'], 'json-compact')) + end + table.insert(tbl, '') -- For last \n + + local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk' + local function kibana_template_callback(err, code, body, _) + if code ~= 200 then + rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url, + err, code, body) + enabled = false + else + lua_util.debugm(N, 'pushed kibana template: %s', body) + end + end + + rspamd_http.request({ + url = kibana_url, + ev_base = ev_base, + config = cfg, + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = table.concat(tbl, "\n"), + method = 'post', + gzip = settings.use_gzip, + callback = kibana_template_callback, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + else + rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file']) + end + end + end + + if enabled then + -- create ingest pipeline + local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip' + local function geoip_cb(err, code, body, _) + if code ~= 200 then + rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)', + geoip_url, err, code, body) + enabled = false + end + end + local template = { + description = "Add geoip info for rspamd", + processors = { + { + geoip = { + field = "rspamd_meta.ip", + target_field = "rspamd_meta.geoip" + } + } + } + } + rspamd_http.request({ + url = geoip_url, + ev_base = ev_base, + config = cfg, + callback = geoip_cb, + headers = { + ['Content-Type'] = 'application/json', + }, + gzip = settings.use_gzip, + body = ucl.to_format(template, 'json-compact'), + method = 'put', + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + -- create template mappings if not exist + local template_url = connect_prefix .. ip_addr .. '/_template/rspamd' + local function http_template_put_callback(err, code, body, _) + if code ~= 200 then + rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', + template_url, err, code, body) + enabled = false + else + lua_util.debugm(N, 'pushed rspamd template: %s', body) + push_kibana_template() + end + end + local function http_template_exist_callback(_, code, _, _) + if code ~= 200 then + rspamd_http.request({ + url = template_url, + ev_base = ev_base, + config = cfg, + body = elastic_template, + method = 'put', + headers = { + ['Content-Type'] = 'application/json', + }, + gzip = settings.use_gzip, + callback = http_template_put_callback, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + else + push_kibana_template() + end + end + + rspamd_http.request({ + url = template_url, + ev_base = ev_base, + config = cfg, + method = 'head', + callback = http_template_exist_callback, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + + end +end + +redis_params = rspamd_redis.parse_redis_server('elastic') + +if redis_params and opts then + for k, v in pairs(opts) do + settings[k] = v + end + + if not settings['server'] and not settings['servers'] then + rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') + lua_util.disable_module(N, "config") + else + if settings.use_https then + connect_prefix = 'https://' + end + + if settings.ingest_module then + ingest_geoip_type = 'modules' + end + + settings.upstream = upstream_list.create(rspamd_config, + settings['server'] or settings['servers'], 9200) + + if not settings.upstream then + rspamd_logger.errx('cannot parse elastic address: %s', + settings['server'] or settings['servers']) + lua_util.disable_module(N, "config") + return + end + if not settings['template_file'] then + rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') + lua_util.disable_module(N, "config") + return + end + + elastic_template = read_file(settings['template_file']); + if not elastic_template then + rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module', + settings['template_file']) + lua_util.disable_module(N, "config") + return + end + + rspamd_config:register_symbol({ + name = 'ELASTIC_COLLECT', + type = 'idempotent', + callback = elastic_collect, + flags = 'empty,explicit_disable,ignore_passthrough', + augmentations = { string.format("timeout=%f", settings.timeout) }, + }) + + rspamd_config:add_on_load(function(cfg, ev_base, worker) + if worker:is_scanner() then + check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements + initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations + end + end) + end + +end |