diff options
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 1662 |
1 files changed, 1662 insertions, 0 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c new file mode 100644 index 0000000..1ad3b3d --- /dev/null +++ b/src/lua/lua_redis.c @@ -0,0 +1,1662 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "lua_common.h" +#include "lua_thread_pool.h" +#include "utlist.h" + +#include "contrib/hiredis/hiredis.h" +#include "contrib/hiredis/async.h" + +#define REDIS_DEFAULT_TIMEOUT 1.0 + +static const gchar *M = "rspamd lua redis"; +static void *redis_null; + +/*** + * @module rspamd_redis + * This module implements redis asynchronous client for rspamd LUA API. + * Here is an example of using of this module: + * @example +local rspamd_redis = require "rspamd_redis" +local rspamd_logger = require "rspamd_logger" + +local function symbol_callback(task) + local redis_key = 'some_key' + local function redis_cb(err, data) + if not err then + rspamd_logger.infox('redis returned %1=%2', redis_key, data) + end + end + + rspamd_redis.make_request(task, "127.0.0.1:6379", redis_cb, + 'GET', {redis_key}) + -- or in table form: + -- rspamd_redis.make_request({task=task, host="127.0.0.1:6379, + -- callback=redis_cb, timeout=2.0, cmd='GET', args={redis_key}}) +end + */ + +LUA_FUNCTION_DEF(redis, make_request); +LUA_FUNCTION_DEF(redis, make_request_sync); +LUA_FUNCTION_DEF(redis, connect); +LUA_FUNCTION_DEF(redis, connect_sync); +LUA_FUNCTION_DEF(redis, add_cmd); +LUA_FUNCTION_DEF(redis, exec); +LUA_FUNCTION_DEF(redis, gc); + +static const struct luaL_reg redislib_f[] = { + LUA_INTERFACE_DEF(redis, make_request), + LUA_INTERFACE_DEF(redis, make_request_sync), + LUA_INTERFACE_DEF(redis, connect), + LUA_INTERFACE_DEF(redis, connect_sync), + {NULL, NULL}}; + +static const struct luaL_reg redislib_m[] = { + LUA_INTERFACE_DEF(redis, add_cmd), + LUA_INTERFACE_DEF(redis, exec), + {"__gc", lua_redis_gc}, + {"__tostring", rspamd_lua_class_tostring}, + {NULL, NULL}}; + +#undef REDIS_DEBUG_REFS +#ifdef REDIS_DEBUG_REFS +#define REDIS_RETAIN(x) \ + do { \ + msg_err("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RETAIN(x); \ + } while (0) + +#define REDIS_RELEASE(x) \ + do { \ + msg_err("release ref %p, refcount: %d", (x), (x)->ref.refcount); \ + REF_RELEASE(x); \ + } while (0) +#else +#define REDIS_RETAIN REF_RETAIN +#define REDIS_RELEASE REF_RELEASE +#endif + +struct lua_redis_request_specific_userdata; +/** + * Struct for userdata representation + */ +struct lua_redis_userdata { + redisAsyncContext *ctx; + struct rspamd_task *task; + struct rspamd_symcache_dynamic_item *item; + struct rspamd_async_session *s; + struct ev_loop *event_loop; + struct rspamd_config *cfg; + struct rspamd_redis_pool *pool; + gchar *server; + gchar log_tag[RSPAMD_LOG_ID_LEN + 1]; + struct lua_redis_request_specific_userdata *specific; + gdouble timeout; + guint16 port; + guint16 terminated; +}; + +#define msg_debug_lua_redis(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \ + G_STRFUNC, \ + __VA_ARGS__) +INIT_LOG_MODULE(lua_redis) + +#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0) +/* session was finished */ +#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1) +#define LUA_REDIS_ASYNC (1 << 0) +#define LUA_REDIS_TEXTDATA (1 << 1) +#define LUA_REDIS_TERMINATED (1 << 2) +#define LUA_REDIS_NO_POOL (1 << 3) +#define LUA_REDIS_SUBSCRIBED (1 << 4) +#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) + +struct lua_redis_request_specific_userdata { + gint cbref; + guint nargs; + gchar **args; + gsize *arglens; + struct lua_redis_userdata *c; + struct lua_redis_ctx *ctx; + struct lua_redis_request_specific_userdata *next; + ev_timer timeout_ev; + guint flags; +}; + +struct lua_redis_ctx { + guint flags; + struct lua_redis_userdata async; + guint cmds_pending; + ref_entry_t ref; + GQueue *replies; /* for sync connection only */ + GQueue *events_cleanup; /* for sync connection only */ + struct thread_entry *thread; /* for sync mode, set only if there was yield */ +}; + +struct lua_redis_result { + gboolean is_error; + gint result_ref; + struct rspamd_symcache_dynamic_item *item; + struct rspamd_async_session *s; + struct rspamd_task *task; + struct lua_redis_request_specific_userdata *sp_ud; +}; + +static struct lua_redis_ctx * +lua_check_redis(lua_State *L, gint pos) +{ + void *ud = rspamd_lua_check_udata(L, pos, "rspamd{redis}"); + luaL_argcheck(L, ud != NULL, pos, "'redis' expected"); + return ud ? *((struct lua_redis_ctx **) ud) : NULL; +} + +static void +lua_redis_free_args(char **args, gsize *arglens, guint nargs) +{ + guint i; + + if (args) { + for (i = 0; i < nargs; i++) { + g_free(args[i]); + } + + g_free(args); + g_free(arglens); + } +} + +static void +lua_redis_dtor(struct lua_redis_ctx *ctx) +{ + struct lua_redis_userdata *ud; + struct lua_redis_request_specific_userdata *cur, *tmp; + gboolean is_successful = TRUE; + struct redisAsyncContext *ac; + + ud = &ctx->async; + msg_debug_lua_redis("destructing %p", ctx); + + if (ud->ctx) { + + LL_FOREACH_SAFE(ud->specific, cur, tmp) + { + ev_timer_stop(ud->event_loop, &cur->timeout_ev); + + if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { + is_successful = FALSE; + } + + cur->flags |= LUA_REDIS_SPECIFIC_FINISHED; + } + + ctx->flags |= LUA_REDIS_TERMINATED; + + ud->terminated = 1; + ac = ud->ctx; + ud->ctx = NULL; + + if (!is_successful) { + rspamd_redis_pool_release_connection(ud->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); + } + else { + rspamd_redis_pool_release_connection(ud->pool, ac, + (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + } + } + + LL_FOREACH_SAFE(ud->specific, cur, tmp) + { + lua_redis_free_args(cur->args, cur->arglens, cur->nargs); + + if (cur->cbref != -1) { + luaL_unref(ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); + } + + g_free(cur); + } + + if (ctx->events_cleanup) { + g_queue_free(ctx->events_cleanup); + ctx->events_cleanup = NULL; + } + if (ctx->replies) { + g_queue_free(ctx->replies); + ctx->replies = NULL; + } + + g_free(ctx); +} + +static gint +lua_redis_gc(lua_State *L) +{ + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); + + if (ctx) { + REDIS_RELEASE(ctx); + } + + return 0; +} + +static void +lua_redis_fin(void *arg) +{ + struct lua_redis_request_specific_userdata *sp_ud = arg; + struct lua_redis_userdata *ud; + struct lua_redis_ctx *ctx; + + ctx = sp_ud->ctx; + ud = sp_ud->c; + + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); + } + + msg_debug_lua_redis("finished redis query %p from session %p; refcount=%d", + sp_ud, ctx, ctx->ref.refcount); + sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED; + + REDIS_RELEASE(ctx); +} + +/** + * Push error of redis request to lua callback + * @param code + * @param ud + */ +static void +lua_redis_push_error(const gchar *err, + struct lua_redis_ctx *ctx, + struct lua_redis_request_specific_userdata *sp_ud, + gboolean connected) +{ + struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; + lua_State *L; + + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED))) { + if (sp_ud->cbref != -1) { + + lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); + L = cbs.L; + + lua_pushcfunction(L, &rspamd_lua_traceback); + int err_idx = lua_gettop(L); + /* Push error */ + lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); + + /* String of error */ + lua_pushstring(cbs.L, err); + /* Data is nil */ + lua_pushnil(cbs.L); + + if (ud->item) { + rspamd_symcache_set_cur_item(ud->task, ud->item); + } + + if (lua_pcall(cbs.L, 2, 0, err_idx) != 0) { + msg_info("call to callback failed: %s", lua_tostring(cbs.L, -1)); + } + + lua_settop(L, err_idx - 1); + lua_thread_pool_restore_callback(&cbs); + } + + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; + + if (connected && ud->s) { + if (ud->item) { + rspamd_symcache_item_async_dec_check(ud->task, ud->item, M); + } + + rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); + } + else { + lua_redis_fin(sp_ud); + } + } +} + +static void +lua_redis_push_reply(lua_State *L, const redisReply *r, gboolean text_data) +{ + guint i; + struct rspamd_lua_text *t; + + switch (r->type) { + case REDIS_REPLY_INTEGER: + lua_pushinteger(L, r->integer); + break; + case REDIS_REPLY_NIL: + lua_getfield(L, LUA_REGISTRYINDEX, "redis.null"); + break; + case REDIS_REPLY_STRING: + case REDIS_REPLY_STATUS: + if (text_data) { + t = lua_newuserdata(L, sizeof(*t)); + rspamd_lua_setclass(L, "rspamd{text}", -1); + t->flags = 0; + t->start = r->str; + t->len = r->len; + } + else { + lua_pushlstring(L, r->str, r->len); + } + break; + case REDIS_REPLY_ARRAY: + lua_createtable(L, r->elements, 0); + for (i = 0; i < r->elements; ++i) { + lua_redis_push_reply(L, r->element[i], text_data); + lua_rawseti(L, -2, i + 1); /* Store sub-reply */ + } + break; + default: /* should not happen */ + msg_info("unknown reply type: %d", r->type); + break; + } +} + +/** + * Push data of redis request to lua callback + * @param r redis reply data + * @param ud + */ +static void +lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx, + struct lua_redis_request_specific_userdata *sp_ud) +{ + struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; + lua_State *L; + + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED)) || + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + if (sp_ud->cbref != -1) { + lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); + L = cbs.L; + + lua_pushcfunction(L, &rspamd_lua_traceback); + int err_idx = lua_gettop(L); + /* Push error */ + lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); + /* Error is nil */ + lua_pushnil(cbs.L); + /* Data */ + lua_redis_push_reply(cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); + + if (ud->item) { + rspamd_symcache_set_cur_item(ud->task, ud->item); + } + + gint ret = lua_pcall(cbs.L, 2, 0, err_idx); + + if (ret != 0) { + msg_info("call to lua_redis callback failed (%d): %s", + ret, lua_tostring(cbs.L, -1)); + } + + lua_settop(L, err_idx - 1); + lua_thread_pool_restore_callback(&cbs); + } + + if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) { + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) { + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(sp_ud->ctx->async.event_loop, + &sp_ud->timeout_ev); + } + } + } + + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; + + if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + if (ud->s) { + if (ud->item) { + rspamd_symcache_item_async_dec_check(ud->task, + ud->item, M); + } + + rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); + } + else { + lua_redis_fin(sp_ud); + } + } + } +} + +/** + * Callback for redis replies + * @param c context of redis connection + * @param r redis reply + * @param priv userdata + */ +static void +lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv) +{ + redisReply *reply = r; + struct lua_redis_request_specific_userdata *sp_ud = priv; + struct lua_redis_ctx *ctx; + struct lua_redis_userdata *ud; + redisAsyncContext *ac; + + ctx = sp_ud->ctx; + ud = sp_ud->c; + + if (ud->terminated || !rspamd_lua_is_initialised()) { + /* We are already at the termination stage, just go out */ + return; + } + + msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx, + sp_ud); + + REDIS_RETAIN(ctx); + + /* If session is finished, we cannot call lua callbacks */ + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) || + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + if (c->err == 0) { + if (r != NULL) { + if (reply->type != REDIS_REPLY_ERROR) { + lua_redis_push_data(reply, ctx, sp_ud); + } + else { + lua_redis_push_error(reply->str, ctx, sp_ud, TRUE); + } + } + else { + lua_redis_push_error("received no data from server", ctx, sp_ud, TRUE); + } + } + else { + if (c->err == REDIS_ERR_IO) { + lua_redis_push_error(strerror(errno), ctx, sp_ud, TRUE); + } + else { + lua_redis_push_error(c->errstr, ctx, sp_ud, TRUE); + } + } + } + + if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + ctx->cmds_pending--; + + if (ctx->cmds_pending == 0 && !ud->terminated) { + /* Disconnect redis early as we don't need it anymore */ + ud->terminated = 1; + ac = ud->ctx; + ud->ctx = NULL; + + if (ac) { + msg_debug_lua_redis("release redis connection ud=%p; ctx=%p; refcount=%d", + ud, ctx, ctx->ref.refcount); + rspamd_redis_pool_release_connection(ud->pool, ac, + (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + } + } + } + + REDIS_RELEASE(ctx); +} + +static gint +lua_redis_push_results(struct lua_redis_ctx *ctx, lua_State *L) +{ + gint results = g_queue_get_length(ctx->replies); + gint i; + gboolean can_use_lua = TRUE; + + results = g_queue_get_length(ctx->replies); + + if (!lua_checkstack(L, (results * 2) + 1)) { + luaL_error(L, "cannot resize stack to fit %d commands", + ctx->cmds_pending); + + can_use_lua = FALSE; + } + + for (i = 0; i < results; i++) { + struct lua_redis_result *result = g_queue_pop_head(ctx->replies); + + if (can_use_lua) { + lua_pushboolean(L, !result->is_error); + lua_rawgeti(L, LUA_REGISTRYINDEX, result->result_ref); + } + + luaL_unref(L, LUA_REGISTRYINDEX, result->result_ref); + + g_queue_push_tail(ctx->events_cleanup, result); + } + + return can_use_lua ? results * 2 : 0; +} + +static void +lua_redis_cleanup_events(struct lua_redis_ctx *ctx) +{ + REDIS_RETAIN(ctx); /* To avoid preliminary destruction */ + + while (!g_queue_is_empty(ctx->events_cleanup)) { + struct lua_redis_result *result = g_queue_pop_head(ctx->events_cleanup); + + if (result->item) { + rspamd_symcache_item_async_dec_check(result->task, result->item, M); + } + + if (result->s) { + rspamd_session_remove_event(result->s, lua_redis_fin, result->sp_ud); + } + else { + lua_redis_fin(result->sp_ud); + } + + g_free(result); + } + + REDIS_RELEASE(ctx); +} + +/** + * Callback for redis replies + * @param c context of redis connection + * @param r redis reply + * @param priv userdata + */ +static void +lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) +{ + redisReply *reply = r; + + struct lua_redis_request_specific_userdata *sp_ud = priv; + struct lua_redis_ctx *ctx; + struct lua_redis_userdata *ud; + struct thread_entry *thread; + gint results; + + ctx = sp_ud->ctx; + ud = sp_ud->c; + lua_State *L = ctx->async.cfg->lua_state; + + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; + + if (ud->terminated) { + /* We are already at the termination stage, just go out */ + /* TODO: + if somebody is waiting for us (ctx->thread), return result, + otherwise, indeed, ignore + */ + return; + } + + if (ev_can_stop(&sp_ud->timeout_ev)) { + ev_timer_stop(ud->event_loop, &sp_ud->timeout_ev); + } + + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { + msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud); + + struct lua_redis_result *result = g_malloc0(sizeof *result); + + if (ac->err == 0) { + if (r != NULL) { + if (reply->type != REDIS_REPLY_ERROR) { + result->is_error = FALSE; + lua_redis_push_reply(L, reply, ctx->flags & LUA_REDIS_TEXTDATA); + } + else { + result->is_error = TRUE; + lua_pushstring(L, reply->str); + } + } + else { + result->is_error = TRUE; + lua_pushliteral(L, "received no data from server"); + } + } + else { + result->is_error = TRUE; + if (ac->err == REDIS_ERR_IO) { + lua_pushstring(L, strerror(errno)); + } + else { + lua_pushstring(L, ac->errstr); + } + } + + /* if error happened, we should terminate the connection, + and release it */ + + if (result->is_error && sp_ud->c->ctx) { + ac = sp_ud->c->ctx; + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ctx->flags |= LUA_REDIS_TERMINATED; + + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); + } + + result->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); + result->s = ud->s; + result->item = ud->item; + result->task = ud->task; + result->sp_ud = sp_ud; + + g_queue_push_tail(ctx->replies, result); + } + + ctx->cmds_pending--; + + if (ctx->cmds_pending == 0) { + if (ctx->thread) { + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { + /* somebody yielded and waits for results */ + thread = ctx->thread; + ctx->thread = NULL; + + results = lua_redis_push_results(ctx, thread->lua_state); + + if (ud->item) { + rspamd_symcache_set_cur_item(ud->task, ud->item); + } + + lua_thread_resume(thread, results); + lua_redis_cleanup_events(ctx); + } + else { + /* We cannot resume the thread as the associated task has gone */ + lua_thread_pool_terminate_entry_full(ud->cfg->lua_thread_pool, + ctx->thread, G_STRLOC, true); + ctx->thread = NULL; + } + } + } +} + +static void +lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) +{ + struct lua_redis_request_specific_userdata *sp_ud = + (struct lua_redis_request_specific_userdata *) w->data; + struct lua_redis_ctx *ctx; + struct lua_redis_userdata *ud; + redisAsyncContext *ac; + + if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { + return; + } + + ud = sp_ud->c; + ctx = sp_ud->ctx; + msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, + sp_ud->c->ctx); + + if (sp_ud->c->ctx) { + ac = sp_ud->c->ctx; + + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ac->err = REDIS_ERR_IO; + errno = ETIMEDOUT; + ctx->flags |= LUA_REDIS_TERMINATED; + + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); + } +} + +static void +lua_redis_timeout(EV_P_ ev_timer *w, int revents) +{ + struct lua_redis_request_specific_userdata *sp_ud = + (struct lua_redis_request_specific_userdata *) w->data; + struct lua_redis_userdata *ud; + struct lua_redis_ctx *ctx; + redisAsyncContext *ac; + + if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { + return; + } + + ctx = sp_ud->ctx; + ud = sp_ud->c; + + REDIS_RETAIN(ctx); + msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, + sp_ud->c->ctx); + lua_redis_push_error("timeout while connecting the server", ctx, sp_ud, TRUE); + + if (sp_ud->c->ctx) { + ac = sp_ud->c->ctx; + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ac->err = REDIS_ERR_IO; + errno = ETIMEDOUT; + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, + RSPAMD_REDIS_RELEASE_FATAL); + } + + REDIS_RELEASE(ctx); +} + + +static void +lua_redis_parse_args(lua_State *L, gint idx, const gchar *cmd, + gchar ***pargs, gsize **parglens, guint *nargs) +{ + gchar **args = NULL; + gsize *arglens; + gint top; + + if (idx != 0 && lua_type(L, idx) == LUA_TTABLE) { + /* Get all arguments */ + lua_pushvalue(L, idx); + lua_pushnil(L); + top = 0; + + while (lua_next(L, -2) != 0) { + gint type = lua_type(L, -1); + + if (type == LUA_TNUMBER || type == LUA_TSTRING || + type == LUA_TUSERDATA) { + top++; + } + lua_pop(L, 1); + } + + args = g_malloc((top + 1) * sizeof(gchar *)); + arglens = g_malloc((top + 1) * sizeof(gsize)); + arglens[0] = strlen(cmd); + args[0] = g_malloc(arglens[0]); + memcpy(args[0], cmd, arglens[0]); + top = 1; + lua_pushnil(L); + + while (lua_next(L, -2) != 0) { + gint type = lua_type(L, -1); + + if (type == LUA_TSTRING) { + const gchar *s; + + s = lua_tolstring(L, -1, &arglens[top]); + args[top] = g_malloc(arglens[top]); + memcpy(args[top], s, arglens[top]); + top++; + } + else if (type == LUA_TUSERDATA) { + struct rspamd_lua_text *t; + + t = lua_check_text(L, -1); + + if (t && t->start) { + arglens[top] = t->len; + args[top] = g_malloc(arglens[top]); + memcpy(args[top], t->start, arglens[top]); + top++; + } + } + else if (type == LUA_TNUMBER) { + gdouble val = lua_tonumber(L, -1); + gint r; + gchar numbuf[64]; + + if (val == (gdouble) ((gint64) val)) { + r = rspamd_snprintf(numbuf, sizeof(numbuf), "%L", + (gint64) val); + } + else { + r = rspamd_snprintf(numbuf, sizeof(numbuf), "%f", + val); + } + + arglens[top] = r; + args[top] = g_malloc(arglens[top]); + memcpy(args[top], numbuf, arglens[top]); + top++; + } + + lua_pop(L, 1); + } + + lua_pop(L, 1); + } + else { + /* Use merely cmd */ + + args = g_malloc(sizeof(gchar *)); + arglens = g_malloc(sizeof(gsize)); + arglens[0] = strlen(cmd); + args[0] = g_malloc(arglens[0]); + memcpy(args[0], cmd, arglens[0]); + top = 1; + } + + *pargs = args; + *parglens = arglens; + *nargs = top; +} + +static struct lua_redis_ctx * +rspamd_lua_redis_prepare_connection(lua_State *L, gint *pcbref, gboolean is_async) +{ + struct lua_redis_ctx *ctx = NULL; + rspamd_inet_addr_t *ip = NULL; + struct lua_redis_userdata *ud = NULL; + struct rspamd_lua_ip *addr = NULL; + struct rspamd_task *task = NULL; + const gchar *host = NULL; + const gchar *username = NULL, *password = NULL, *dbname = NULL, *log_tag = NULL; + gint cbref = -1; + struct rspamd_config *cfg = NULL; + struct rspamd_async_session *session = NULL; + struct ev_loop *ev_base = NULL; + gboolean ret = FALSE; + guint flags = 0; + + if (lua_istable(L, 1)) { + /* Table version */ + lua_pushvalue(L, 1); + lua_pushstring(L, "task"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + task = lua_check_task_maybe(L, -1); + } + lua_pop(L, 1); + + if (!task) { + /* We need to get ev_base, config and session separately */ + lua_pushstring(L, "config"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + cfg = lua_check_config(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "session"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + session = lua_check_session(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "ev_base"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + ev_base = lua_check_ev_base(L, -1); + } + lua_pop(L, 1); + + if (cfg && ev_base) { + ret = TRUE; + } + else if (!cfg) { + msg_err_task_check("config is not passed"); + } + else { + msg_err_task_check("ev_base is not set"); + } + } + else { + cfg = task->cfg; + session = task->s; + ev_base = task->event_loop; + log_tag = task->task_pool->tag.uid; + ret = TRUE; + } + + if (pcbref) { + lua_pushstring(L, "callback"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TFUNCTION) { + /* This also pops function from the stack */ + cbref = luaL_ref(L, LUA_REGISTRYINDEX); + *pcbref = cbref; + } + else { + *pcbref = -1; + lua_pop(L, 1); + } + } + + lua_pushstring(L, "host"); + lua_gettable(L, -2); + + if (lua_type(L, -1) == LUA_TUSERDATA) { + addr = lua_check_ip(L, -1); + host = rspamd_inet_address_to_string_pretty(addr->addr); + } + else if (lua_type(L, -1) == LUA_TSTRING) { + host = lua_tostring(L, -1); + + if (rspamd_parse_inet_address(&ip, + host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + addr = g_alloca(sizeof(*addr)); + addr->addr = ip; + + if (rspamd_inet_address_get_port(ip) == 0) { + rspamd_inet_address_set_port(ip, 6379); + } + } + } + lua_pop(L, 1); + + lua_pushstring(L, "username"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + username = lua_tostring(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "password"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + password = lua_tostring(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "dbname"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + dbname = lua_tostring(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "opaque_data"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { + flags |= LUA_REDIS_TEXTDATA; + } + lua_pop(L, 1); + + lua_pushstring(L, "no_pool"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { + flags |= LUA_REDIS_NO_POOL; + } + lua_pop(L, 1); + + lua_pop(L, 1); /* table */ + + if (session && rspamd_session_blocked(session)) { + msg_err_task_check("Session is being destroying"); + ret = FALSE; + } + + if (ret && addr != NULL) { + ctx = g_malloc0(sizeof(struct lua_redis_ctx)); + REF_INIT_RETAIN(ctx, lua_redis_dtor); + if (is_async) { + ctx->flags |= flags | LUA_REDIS_ASYNC; + ud = &ctx->async; + } + else { + ud = &ctx->async; + ctx->replies = g_queue_new(); + ctx->events_cleanup = g_queue_new(); + } + + ud->s = session; + ud->cfg = cfg; + ud->pool = cfg->redis_pool; + ud->event_loop = ev_base; + ud->task = task; + + if (log_tag) { + rspamd_strlcpy(ud->log_tag, log_tag, sizeof(ud->log_tag)); + } + else { + /* Use pointer itself as a tag */ + rspamd_snprintf(ud->log_tag, sizeof(ud->log_tag), + "%ud", + (int) rspamd_cryptobox_fast_hash(&ud, sizeof(ud), 0)); + } + + if (task) { + ud->item = rspamd_symcache_get_cur_item(task); + } + + ret = TRUE; + } + else { + if (cbref != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, cbref); + } + + msg_err_task_check("incorrect function invocation"); + ret = FALSE; + } + } + + if (ret) { + ud->terminated = 0; + ud->ctx = rspamd_redis_pool_connect(ud->pool, + dbname, username, password, + rspamd_inet_address_to_string(addr->addr), + rspamd_inet_address_get_port(addr->addr)); + + if (ip) { + rspamd_inet_address_free(ip); + } + + if (ud->ctx == NULL || ud->ctx->err) { + if (ud->ctx) { + msg_err_task_check("cannot connect to redis: %s", + ud->ctx->errstr); + rspamd_redis_pool_release_connection(ud->pool, ud->ctx, + RSPAMD_REDIS_RELEASE_FATAL); + ud->ctx = NULL; + } + else { + msg_err_task_check("cannot connect to redis (OS error): %s", + strerror(errno)); + } + + REDIS_RELEASE(ctx); + + return NULL; + } + + msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p", + host, ctx, ud); + + return ctx; + } + + if (ip) { + rspamd_inet_address_free(ip); + } + + return NULL; +} + +/*** + * @function rspamd_redis.make_request({params}) + * Make request to redis server, params is a table of key=value arguments in any order + * @param {task} task worker task object + * @param {ip|string} host server address + * @param {function} callback callback to be called in form `function (task, err, data)` + * @param {string} cmd command to be sent to redis + * @param {table} args numeric array of strings used as redis arguments + * @param {number} timeout timeout in seconds for request (1.0 by default) + * @return {boolean} `true` if a request has been scheduled + */ +static int +lua_redis_make_request(lua_State *L) +{ + LUA_TRACE_POINT; + struct lua_redis_request_specific_userdata *sp_ud; + struct lua_redis_userdata *ud; + struct lua_redis_ctx *ctx, **pctx; + const gchar *cmd = NULL; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + gint cbref = -1; + gboolean ret = FALSE; + + ctx = rspamd_lua_redis_prepare_connection(L, &cbref, TRUE); + + if (ctx) { + ud = &ctx->async; + sp_ud = g_malloc0(sizeof(*sp_ud)); + sp_ud->cbref = cbref; + sp_ud->c = ud; + sp_ud->ctx = ctx; + + lua_pushstring(L, "cmd"); + lua_gettable(L, -2); + cmd = lua_tostring(L, -1); + lua_pop(L, 1); + + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + ud->timeout = timeout; + + + lua_pushstring(L, "args"); + lua_gettable(L, 1); + lua_redis_parse_args(L, -1, cmd, &sp_ud->args, &sp_ud->arglens, + &sp_ud->nargs); + lua_pop(L, 1); + LL_PREPEND(ud->specific, sp_ud); + + ret = redisAsyncCommandArgv(ud->ctx, + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); + + if (ret == REDIS_OK) { + if (ud->s) { + rspamd_session_add_event(ud->s, + lua_redis_fin, sp_ud, + M); + + if (ud->item) { + rspamd_symcache_item_async_inc(ud->task, ud->item, M); + } + } + + REDIS_RETAIN(ctx); /* Cleared by fin event */ + ctx->cmds_pending++; + + if (ud->ctx->c.flags & REDIS_SUBSCRIBED) { + msg_debug_lua_redis("subscribe command, never unref/timeout"); + sp_ud->flags |= LUA_REDIS_SUBSCRIBED; + } + + sp_ud->timeout_ev.data = sp_ud; + ev_now_update_if_cheap((struct ev_loop *) ud->event_loop); + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); + ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); + + ret = TRUE; + } + else { + msg_info("call to redis failed: %s", ud->ctx->errstr); + rspamd_redis_pool_release_connection(ud->pool, ud->ctx, + RSPAMD_REDIS_RELEASE_FATAL); + ud->ctx = NULL; + REDIS_RELEASE(ctx); + ret = FALSE; + } + } + else { + lua_pushboolean(L, FALSE); + lua_pushnil(L); + + return 2; + } + + lua_pushboolean(L, ret); + + if (ret) { + pctx = lua_newuserdata(L, sizeof(ctx)); + *pctx = ctx; + rspamd_lua_setclass(L, "rspamd{redis}", -1); + } + else { + lua_pushnil(L); + } + + return 2; +} + +/*** + * @function rspamd_redis.make_request_sync({params}) + * Make blocking request to redis server, params is a table of key=value arguments in any order + * @param {ip|string} host server address + * @param {string} cmd command to be sent to redis + * @param {table} args numeric array of strings used as redis arguments + * @param {number} timeout timeout in seconds for request (1.0 by default) + * @return {boolean + result} `true` and a result if a request has been successful + */ +static int +lua_redis_make_request_sync(lua_State *L) +{ + LUA_TRACE_POINT; + struct rspamd_lua_ip *addr = NULL; + rspamd_inet_addr_t *ip = NULL; + const gchar *cmd = NULL, *host; + struct timeval tv; + gboolean ret = FALSE; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + gchar **args = NULL; + gsize *arglens = NULL; + guint nargs = 0, flags = 0; + redisContext *ctx; + redisReply *r; + + if (lua_istable(L, 1)) { + lua_pushvalue(L, 1); + + lua_pushstring(L, "cmd"); + lua_gettable(L, -2); + cmd = lua_tostring(L, -1); + lua_pop(L, 1); + + lua_pushstring(L, "host"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TUSERDATA) { + addr = lua_check_ip(L, -1); + } + else if (lua_type(L, -1) == LUA_TSTRING) { + host = lua_tostring(L, -1); + if (rspamd_parse_inet_address(&ip, + host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + addr = g_alloca(sizeof(*addr)); + addr->addr = ip; + + if (rspamd_inet_address_get_port(ip) == 0) { + rspamd_inet_address_set_port(ip, 6379); + } + } + } + lua_pop(L, 1); + + lua_pushstring(L, "timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "opaque_data"); + lua_gettable(L, -2); + if (!!lua_toboolean(L, -1)) { + flags |= LUA_REDIS_TEXTDATA; + } + lua_pop(L, 1); + + + if (cmd) { + lua_pushstring(L, "args"); + lua_gettable(L, -2); + lua_redis_parse_args(L, -1, cmd, &args, &arglens, &nargs); + lua_pop(L, 1); + } + + lua_pop(L, 1); + + if (addr && cmd) { + ret = TRUE; + } + } + + if (ret) { + double_to_tv(timeout, &tv); + + if (rspamd_inet_address_get_af(addr->addr) == AF_UNIX) { + ctx = redisConnectUnixWithTimeout( + rspamd_inet_address_to_string(addr->addr), tv); + } + else { + ctx = redisConnectWithTimeout( + rspamd_inet_address_to_string(addr->addr), + rspamd_inet_address_get_port(addr->addr), tv); + } + + if (ip) { + rspamd_inet_address_free(ip); + } + + if (ctx == NULL || ctx->err) { + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); + lua_pushboolean(L, FALSE); + + return 1; + } + + r = redisCommandArgv(ctx, + nargs, + (const gchar **) args, + arglens); + + if (r != NULL) { + if (r->type != REDIS_REPLY_ERROR) { + lua_pushboolean(L, TRUE); + lua_redis_push_reply(L, r, flags & LUA_REDIS_TEXTDATA); + } + else { + lua_pushboolean(L, FALSE); + lua_pushstring(L, r->str); + } + + freeReplyObject(r); + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); + + return 2; + } + else { + msg_info("call to redis failed: %s", ctx->errstr); + redisFree(ctx); + lua_redis_free_args(args, arglens, nargs); + lua_pushboolean(L, FALSE); + } + } + else { + if (ip) { + rspamd_inet_address_free(ip); + } + msg_err("bad arguments for redis request"); + lua_redis_free_args(args, arglens, nargs); + + lua_pushboolean(L, FALSE); + } + + return 1; +} + +/*** + * @function rspamd_redis.connect({params}) + * Make request to redis server, params is a table of key=value arguments in any order + * @param {task} task worker task object + * @param {ip|string} host server address + * @param {number} timeout timeout in seconds for request (1.0 by default) + * @return {boolean,redis} new connection object or nil if connection failed + */ +static int +lua_redis_connect(lua_State *L) +{ + LUA_TRACE_POINT; + struct lua_redis_userdata *ud; + struct lua_redis_ctx *ctx, **pctx; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + + ctx = rspamd_lua_redis_prepare_connection(L, NULL, TRUE); + + if (ctx) { + ud = &ctx->async; + + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); + } + + lua_pop(L, 1); + ud->timeout = timeout; + } + else { + lua_pushboolean(L, FALSE); + lua_pushnil(L); + + return 2; + } + + lua_pushboolean(L, TRUE); + pctx = lua_newuserdata(L, sizeof(ctx)); + *pctx = ctx; + rspamd_lua_setclass(L, "rspamd{redis}", -1); + + return 2; +} + +/*** + * @function rspamd_redis.connect_sync({params}) + * Make blocking request to redis server, params is a table of key=value arguments in any order + * @param {ip|string} host server address + * @param {number} timeout timeout in seconds for request (1.0 by default) + * @return {redis} redis object if a request has been successful + */ +static int +lua_redis_connect_sync(lua_State *L) +{ + LUA_TRACE_POINT; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + struct lua_redis_ctx *ctx, **pctx; + + ctx = rspamd_lua_redis_prepare_connection(L, NULL, FALSE); + + if (ctx) { + if (lua_istable(L, 1)) { + lua_pushstring(L, "timeout"); + lua_gettable(L, 1); + if (lua_type(L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + } + + ctx->async.timeout = timeout; + + lua_pushboolean(L, TRUE); + pctx = lua_newuserdata(L, sizeof(ctx)); + *pctx = ctx; + rspamd_lua_setclass(L, "rspamd{redis}", -1); + } + else { + lua_pushboolean(L, FALSE); + lua_pushstring(L, "bad arguments for redis request"); + return 2; + } + + return 2; +} + +/*** + * @method rspamd_redis:add_cmd(cmd, {args}) + * Append new cmd to redis pipeline + * @param {string} cmd command to be sent to redis + * @param {table} args array of strings used as redis arguments + * @return {boolean} `true` if a request has been successful + */ +static int +lua_redis_add_cmd(lua_State *L) +{ + LUA_TRACE_POINT; + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); + struct lua_redis_request_specific_userdata *sp_ud; + struct lua_redis_userdata *ud; + const gchar *cmd = NULL; + gint args_pos = 2; + gint cbref = -1, ret; + + if (ctx) { + if (ctx->flags & LUA_REDIS_TERMINATED) { + lua_pushboolean(L, FALSE); + lua_pushstring(L, "Connection is terminated"); + + return 2; + } + + /* Async version */ + if (lua_type(L, 2) == LUA_TSTRING) { + /* No callback version */ + cmd = lua_tostring(L, 2); + args_pos = 3; + } + else if (lua_type(L, 2) == LUA_TFUNCTION) { + lua_pushvalue(L, 2); + cbref = luaL_ref(L, LUA_REGISTRYINDEX); + cmd = lua_tostring(L, 3); + args_pos = 4; + } + else { + return luaL_error(L, "invalid arguments"); + } + + sp_ud = g_malloc0(sizeof(*sp_ud)); + if (IS_ASYNC(ctx)) { + sp_ud->c = &ctx->async; + ud = &ctx->async; + sp_ud->cbref = cbref; + } + else { + sp_ud->c = &ctx->async; + ud = &ctx->async; + } + sp_ud->ctx = ctx; + + lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args, + &sp_ud->arglens, &sp_ud->nargs); + + LL_PREPEND(sp_ud->c->specific, sp_ud); + + if (ud->s && rspamd_session_blocked(ud->s)) { + lua_pushboolean(L, 0); + lua_pushstring(L, "session is terminating"); + + return 2; + } + + if (IS_ASYNC(ctx)) { + ret = redisAsyncCommandArgv(sp_ud->c->ctx, + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); + } + else { + ret = redisAsyncCommandArgv(sp_ud->c->ctx, + lua_redis_callback_sync, + sp_ud, + sp_ud->nargs, + (const gchar **) sp_ud->args, + sp_ud->arglens); + } + + if (ret == REDIS_OK) { + if (ud->s) { + rspamd_session_add_event(ud->s, + lua_redis_fin, + sp_ud, + M); + + if (ud->item) { + rspamd_symcache_item_async_inc(ud->task, ud->item, M); + } + } + + sp_ud->timeout_ev.data = sp_ud; + + if (IS_ASYNC(ctx)) { + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, + sp_ud->c->timeout, 0.0); + } + else { + ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync, + sp_ud->c->timeout, 0.0); + } + + ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); + REDIS_RETAIN(ctx); + ctx->cmds_pending++; + } + else { + msg_info("call to redis failed: %s", + sp_ud->c->ctx->errstr); + lua_pushboolean(L, 0); + lua_pushstring(L, sp_ud->c->ctx->errstr); + + return 2; + } + } + + lua_pushboolean(L, true); + + return 1; +} + +/*** + * @method rspamd_redis:exec() + * Executes pending commands (suitable for blocking IO only for now) + * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending + */ +static int +lua_redis_exec(lua_State *L) +{ + LUA_TRACE_POINT; + struct lua_redis_ctx *ctx = lua_check_redis(L, 1); + + if (ctx == NULL) { + lua_error(L); + + return 1; + } + + if (IS_ASYNC(ctx)) { + lua_pushstring(L, "Async redis pipelining is not implemented"); + lua_error(L); + return 0; + } + else { + if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) { + lua_pushstring(L, "No pending commands to execute"); + lua_error(L); + } + if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) { + gint results = lua_redis_push_results(ctx, L); + return results; + } + else { + ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool); + return lua_thread_yield(ctx->thread, 0); + } + } +} + +static gint +lua_load_redis(lua_State *L) +{ + lua_newtable(L); + luaL_register(L, NULL, redislib_f); + + return 1; +} + +static gint +lua_redis_null_idx(lua_State *L) +{ + lua_pushnil(L); + + return 1; +} + +static void +lua_redis_null_mt(lua_State *L) +{ + luaL_newmetatable(L, "redis{null}"); + + lua_pushcfunction(L, lua_redis_null_idx); + lua_setfield(L, -2, "__index"); + lua_pushcfunction(L, lua_redis_null_idx); + lua_setfield(L, -2, "__tostring"); + + lua_pop(L, 1); +} + +/** + * Open redis library + * @param L lua stack + * @return + */ +void luaopen_redis(lua_State *L) +{ + rspamd_lua_new_class(L, "rspamd{redis}", redislib_m); + lua_pop(L, 1); + rspamd_lua_add_preload(L, "rspamd_redis", lua_load_redis); + + /* Set null element */ + lua_redis_null_mt(L); + redis_null = lua_newuserdata(L, 0); + luaL_getmetatable(L, "redis{null}"); + lua_setmetatable(L, -2); + lua_setfield(L, LUA_REGISTRYINDEX, "redis.null"); +} |