summaryrefslogtreecommitdiffstats
path: root/src/lua/lua_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lua/lua_udp.c')
-rw-r--r--src/lua/lua_udp.c594
1 files changed, 594 insertions, 0 deletions
diff --git a/src/lua/lua_udp.c b/src/lua/lua_udp.c
new file mode 100644
index 0000000..c79e35a
--- /dev/null
+++ b/src/lua/lua_udp.c
@@ -0,0 +1,594 @@
+/*-
+ * Copyright 2019 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "lua_common.h"
+#include "lua_thread_pool.h"
+#include "utlist.h"
+#include "unix-std.h"
+#include <math.h>
+#include <src/libutil/libev_helper.h>
+
+static const gchar *M = "rspamd lua udp";
+
+/***
+ * @module rspamd_udp
+ * Rspamd UDP module is available from the version 1.9.0 and represents a generic
+ * UDP asynchronous client available from the LUA code.
+ * This module is quite simple: it can either send requests to some address or
+ * it can send requests and wait for replies, potentially handling retransmits.
+ * @example
+local logger = require "rspamd_logger"
+local udp = require "rspamd_udp"
+
+rspamd_config.SYM = function(task)
+ udp.sento{
+ host = addr, -- must be ip address object (e.g. received by upstream module)
+ port = 500,
+ data = {'str1', 'str2'}, -- can be table, string or rspamd_text
+ timeout = 0.5, -- default = 1s
+ task = task, -- if has task
+ session = session, -- optional
+ ev_base = ev_base, -- if no task available
+ -- You can include callback and then Rspamd will try to read replies
+ callback = function(success, data)
+ -- success is bool, data is either data or an error (string)
+ end,
+ retransmits = 0, -- Or more if retransmitting is necessary
+ }
+end
+ */
+
+static const double default_udp_timeout = 1.0;
+
+LUA_FUNCTION_DEF(udp, sendto);
+
+static const struct luaL_reg udp_libf[] = {
+ LUA_INTERFACE_DEF(udp, sendto),
+ {NULL, NULL}};
+
+struct lua_udp_cbdata {
+ struct ev_loop *event_loop;
+ struct rspamd_io_ev ev;
+ struct rspamd_async_event *async_ev;
+ struct rspamd_task *task;
+ rspamd_mempool_t *pool;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_symcache_dynamic_item *item;
+ struct rspamd_async_session *s;
+ struct iovec *iov;
+ lua_State *L;
+ guint retransmits;
+ guint iovlen;
+ gint sock;
+ gint cbref;
+ gboolean sent;
+};
+
+#define msg_debug_udp(...) rspamd_conditional_debug_fast(NULL, cbd->addr, \
+ rspamd_lua_udp_log_id, "lua_udp", cbd->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(lua_udp)
+
+static inline void
+lua_fill_iov(lua_State *L, rspamd_mempool_t *pool,
+ struct iovec *iov, gint pos)
+{
+ if (lua_type(L, pos) == LUA_TUSERDATA) {
+ struct rspamd_lua_text *t = lua_check_text(L, pos);
+
+ if (t) {
+ iov->iov_base = rspamd_mempool_alloc(pool, t->len);
+ iov->iov_len = t->len;
+ memcpy(iov->iov_base, t->start, t->len);
+ }
+ }
+ else {
+ const gchar *s;
+ gsize len;
+
+ s = lua_tolstring(L, pos, &len);
+
+ iov->iov_base = rspamd_mempool_alloc(pool, len);
+ iov->iov_len = len;
+ memcpy(iov->iov_base, s, len);
+ }
+}
+
+static void
+lua_udp_cbd_fin(gpointer p)
+{
+ struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
+
+ if (cbd->sock != -1) {
+ rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev);
+ close(cbd->sock);
+ }
+
+ if (cbd->addr) {
+ rspamd_inet_address_free(cbd->addr);
+ }
+
+ if (cbd->cbref) {
+ luaL_unref(cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ }
+}
+
+static void
+lua_udp_maybe_free(struct lua_udp_cbdata *cbd)
+{
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check(cbd->task, cbd->item, M);
+ cbd->item = NULL;
+ }
+
+ if (cbd->async_ev) {
+ rspamd_session_remove_event(cbd->s, lua_udp_cbd_fin, cbd);
+ }
+ else {
+ lua_udp_cbd_fin(cbd);
+ }
+}
+
+
+enum rspamd_udp_send_result {
+ RSPAMD_SENT_OK,
+ RSPAMD_SENT_RETRY,
+ RSPAMD_SENT_FAILURE
+};
+
+static enum rspamd_udp_send_result
+lua_try_send_request(struct lua_udp_cbdata *cbd)
+{
+ struct msghdr msg;
+ gint r;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = cbd->iov;
+ msg.msg_iovlen = cbd->iovlen;
+ msg.msg_name = rspamd_inet_address_get_sa(cbd->addr, &msg.msg_namelen);
+
+ r = sendmsg(cbd->sock, &msg, 0);
+
+ if (r != -1) {
+ return RSPAMD_SENT_OK;
+ }
+
+ if (errno == EAGAIN || errno == EINTR) {
+ return RSPAMD_SENT_RETRY;
+ }
+
+ return RSPAMD_SENT_FAILURE;
+}
+
+static void
+lua_udp_maybe_push_error(struct lua_udp_cbdata *cbd, const gchar *err)
+{
+ if (cbd->cbref != -1) {
+ gint top;
+ lua_State *L = cbd->L;
+
+ top = lua_gettop(L);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
+
+ /* Error message */
+ lua_pushboolean(L, false);
+ lua_pushstring(L, err);
+
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item(cbd->task, cbd->item);
+ }
+
+ if (lua_pcall(L, 2, 0, 0) != 0) {
+ msg_info("callback call failed: %s", lua_tostring(L, -1));
+ }
+
+ lua_settop(L, top);
+ }
+
+ lua_udp_maybe_free(cbd);
+}
+
+static void
+lua_udp_push_data(struct lua_udp_cbdata *cbd, const gchar *data,
+ gssize len)
+{
+ if (cbd->cbref != -1) {
+ gint top;
+ lua_State *L = cbd->L;
+
+ top = lua_gettop(L);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
+
+ /* Error message */
+ lua_pushboolean(L, true);
+ lua_pushlstring(L, data, len);
+
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item(cbd->task, cbd->item);
+ }
+
+ if (lua_pcall(L, 2, 0, 0) != 0) {
+ msg_info("callback call failed: %s", lua_tostring(L, -1));
+ }
+
+ lua_settop(L, top);
+ }
+
+ lua_udp_maybe_free(cbd);
+}
+
+static gboolean
+lua_udp_maybe_register_event(struct lua_udp_cbdata *cbd)
+{
+ if (cbd->s && !cbd->async_ev) {
+ if (cbd->item) {
+ cbd->async_ev = rspamd_session_add_event_full(cbd->s, lua_udp_cbd_fin,
+ cbd, M,
+ rspamd_symcache_dyn_item_name(cbd->task, cbd->item));
+ }
+ else {
+ cbd->async_ev = rspamd_session_add_event(cbd->s, lua_udp_cbd_fin,
+ cbd, M);
+ }
+
+ if (!cbd->async_ev) {
+ return FALSE;
+ }
+ }
+
+ if (cbd->task && !cbd->item) {
+ cbd->item = rspamd_symcache_get_cur_item(cbd->task);
+ rspamd_symcache_item_async_inc(cbd->task, cbd->item, M);
+ }
+
+ return TRUE;
+}
+
+static void
+lua_udp_io_handler(gint fd, short what, gpointer p)
+{
+ struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
+ gssize r;
+
+ if (what == EV_TIMEOUT) {
+ if (cbd->sent && cbd->retransmits > 0) {
+ r = lua_try_send_request(cbd);
+
+ if (r == RSPAMD_SENT_OK) {
+ rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
+ lua_udp_maybe_register_event(cbd);
+ cbd->retransmits--;
+ }
+ else if (r == RSPAMD_SENT_FAILURE) {
+ lua_udp_maybe_push_error(cbd, "write error");
+ }
+ else {
+ cbd->retransmits--;
+ rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
+ }
+ }
+ else {
+ if (!cbd->sent) {
+ lua_udp_maybe_push_error(cbd, "sent timeout");
+ }
+ else {
+ lua_udp_maybe_push_error(cbd, "read timeout");
+ }
+ }
+ }
+ else if (what == EV_WRITE) {
+ r = lua_try_send_request(cbd);
+
+ if (r == RSPAMD_SENT_OK) {
+ if (cbd->cbref != -1) {
+ rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
+ cbd->sent = TRUE;
+ }
+ else {
+ lua_udp_maybe_free(cbd);
+ }
+ }
+ else if (r == RSPAMD_SENT_FAILURE) {
+ lua_udp_maybe_push_error(cbd, "write error");
+ }
+ else {
+ cbd->retransmits--;
+ rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
+ }
+ }
+ else if (what == EV_READ) {
+ guchar udpbuf[4096];
+ socklen_t slen;
+ struct sockaddr *sa;
+
+ sa = rspamd_inet_address_get_sa(cbd->addr, &slen);
+
+ r = recvfrom(cbd->sock, udpbuf, sizeof(udpbuf), 0, sa, &slen);
+
+ if (r == -1) {
+ lua_udp_maybe_push_error(cbd, strerror(errno));
+ }
+ else {
+ lua_udp_push_data(cbd, udpbuf, r);
+ }
+ }
+}
+
+/***
+ * @function rspamd_udp.sendto({params})
+ * This function simply sends data to an external UDP service
+ *
+ * - `task`: rspamd task objects (implies `pool`, `session` and `ev_base` arguments)
+ * - `ev_base`: event base (if no task specified)
+ * - `session`: events session (no task, optional)
+ * - `pool`: memory pool (if no task specified)
+ * - `host`: IP or name of the peer (required)
+ * - `port`: remote port to use (if `host` has no port part this is required)
+ * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
+ * - `retransmits`: number of retransmits if needed
+ * - `callback`: optional callback if reply should be read
+ * @return {boolean} true if request has been sent (additional string if it has not)
+ */
+static gint
+lua_udp_sendto(lua_State *L)
+{
+ LUA_TRACE_POINT;
+ const gchar *host;
+ guint port;
+ struct ev_loop *ev_base = NULL;
+ struct lua_udp_cbdata *cbd;
+ struct rspamd_async_session *session = NULL;
+ struct rspamd_task *task = NULL;
+ rspamd_inet_addr_t *addr;
+ rspamd_mempool_t *pool = NULL;
+ gdouble timeout = default_udp_timeout;
+
+ if (lua_type(L, 1) == LUA_TTABLE) {
+ lua_pushstring(L, "port");
+ lua_gettable(L, -2);
+
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ port = lua_tointeger(L, -1);
+ }
+ else {
+ /* We assume that it is a unix socket */
+ port = 0;
+ }
+
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "host");
+ lua_gettable(L, -2);
+
+ if (lua_type(L, -1) == LUA_TSTRING) {
+ host = luaL_checkstring(L, -1);
+
+ if (rspamd_parse_inet_address(&addr,
+ host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
+ if (port != 0) {
+ rspamd_inet_address_set_port(addr, port);
+ }
+ }
+ else {
+ lua_pop(L, 1);
+ return luaL_error(L, "invalid host: %s", host);
+ }
+ }
+ else if (lua_type(L, -1) == LUA_TUSERDATA) {
+ struct rspamd_lua_ip *lip;
+
+ lip = lua_check_ip(L, -1);
+
+ if (lip == NULL || lip->addr == NULL) {
+ lua_pop(L, 1);
+ return luaL_error(L, "invalid host class");
+ }
+
+ addr = rspamd_inet_address_copy(lip->addr, NULL);
+
+ if (port != 0) {
+ rspamd_inet_address_set_port(addr, port);
+ }
+ }
+ else {
+ lua_pop(L, 1);
+ return luaL_error(L, "invalid host");
+ }
+
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "task");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TUSERDATA) {
+ task = lua_check_task(L, -1);
+ ev_base = task->event_loop;
+ session = task->s;
+ pool = task->task_pool;
+ }
+ lua_pop(L, 1);
+
+ if (task == NULL) {
+ lua_pushstring(L, "ev_base");
+ lua_gettable(L, -2);
+ if (rspamd_lua_check_udata_maybe(L, -1, "rspamd{ev_base}")) {
+ ev_base = *(struct ev_loop **) lua_touserdata(L, -1);
+ }
+ else {
+ ev_base = NULL;
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "session");
+ lua_gettable(L, -2);
+ if (rspamd_lua_check_udata_maybe(L, -1, "rspamd{session}")) {
+ session = *(struct rspamd_async_session **) lua_touserdata(L, -1);
+ }
+ else {
+ session = NULL;
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "pool");
+ lua_gettable(L, -2);
+ if (rspamd_lua_check_udata_maybe(L, -1, "rspamd{mempool}")) {
+ pool = *(rspamd_mempool_t **) lua_touserdata(L, -1);
+ }
+ else {
+ pool = NULL;
+ }
+ lua_pop(L, 1);
+ }
+
+ lua_pushstring(L, "timeout");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ timeout = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
+ if (!ev_base || !pool) {
+ rspamd_inet_address_free(addr);
+
+ return luaL_error(L, "invalid arguments");
+ }
+
+
+ cbd = rspamd_mempool_alloc0(pool, sizeof(*cbd));
+ cbd->event_loop = ev_base;
+ cbd->pool = pool;
+ cbd->s = session;
+ cbd->addr = addr;
+ cbd->sock = rspamd_socket_create(rspamd_inet_address_get_af(addr),
+ SOCK_DGRAM, 0, TRUE);
+ cbd->cbref = -1;
+ cbd->ev.timeout = timeout;
+
+ if (cbd->sock == -1) {
+ rspamd_inet_address_free(addr);
+
+ return luaL_error(L, "cannot open socket: %s", strerror(errno));
+ }
+
+ cbd->L = L;
+
+ gsize data_len;
+
+ lua_pushstring(L, "data");
+ lua_gettable(L, -2);
+
+ if (lua_type(L, -1) == LUA_TTABLE) {
+ data_len = rspamd_lua_table_size(L, -1);
+ cbd->iov = rspamd_mempool_alloc(pool,
+ sizeof(*cbd->iov) * data_len);
+
+ for (int i = 0; i < data_len; i++) {
+ lua_rawgeti(L, -1, i + 1);
+ lua_fill_iov(L, pool, &cbd->iov[i], -1);
+ lua_pop(L, 1);
+ }
+
+ cbd->iovlen = data_len;
+ }
+ else {
+ cbd->iov = rspamd_mempool_alloc(pool, sizeof(*cbd->iov));
+ cbd->iovlen = 1;
+ lua_fill_iov(L, pool, cbd->iov, -1);
+ }
+
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "callback");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TFUNCTION) {
+ cbd->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+ }
+ else {
+ lua_pop(L, 1);
+ }
+
+ lua_pushstring(L, "retransmits");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ cbd->retransmits = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
+ enum rspamd_udp_send_result r;
+
+ r = lua_try_send_request(cbd);
+ if (r == RSPAMD_SENT_OK) {
+ if (cbd->cbref == -1) {
+ lua_udp_maybe_free(cbd);
+ }
+ else {
+ if (!lua_udp_maybe_register_event(cbd)) {
+ lua_pushboolean(L, false);
+ lua_pushstring(L, "session error");
+ lua_udp_maybe_free(cbd);
+
+ return 2;
+ }
+
+ rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_READ,
+ lua_udp_io_handler, cbd);
+ rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
+ cbd->sent = TRUE;
+ }
+
+ lua_pushboolean(L, true);
+ }
+ else if (r == RSPAMD_SENT_FAILURE) {
+ lua_pushboolean(L, false);
+ lua_pushstring(L, strerror(errno));
+ lua_udp_maybe_free(cbd);
+
+ return 2;
+ }
+ else {
+ rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_WRITE,
+ lua_udp_io_handler, cbd);
+ rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
+
+ if (!lua_udp_maybe_register_event(cbd)) {
+ lua_pushboolean(L, false);
+ lua_pushstring(L, "session error");
+ lua_udp_maybe_free(cbd);
+
+ return 2;
+ }
+ }
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_load_udp(lua_State *L)
+{
+ lua_newtable(L);
+ luaL_register(L, NULL, udp_libf);
+
+ return 1;
+}
+
+void luaopen_udp(lua_State *L)
+{
+ rspamd_lua_add_preload(L, "rspamd_udp", lua_load_udp);
+}