summaryrefslogtreecommitdiffstats
path: root/src/lua/lua_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lua/lua_worker.c')
-rw-r--r--src/lua/lua_worker.c883
1 files changed, 883 insertions, 0 deletions
diff --git a/src/lua/lua_worker.c b/src/lua/lua_worker.c
new file mode 100644
index 0000000..025b97b
--- /dev/null
+++ b/src/lua/lua_worker.c
@@ -0,0 +1,883 @@
+/*
+ * Copyright 2023 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "lua_common.h"
+#include "unix-std.h"
+#include "worker_util.h"
+#include "rspamd_control.h"
+#include "ottery.h"
+
+#ifdef WITH_JEMALLOC
+#include <jemalloc/jemalloc.h>
+#endif
+
+#include <sys/wait.h>
+#include <src/libserver/rspamd_control.h>
+
+/***
+ * @module rspamd_worker
+ * This module provides methods to access worker related functions in various
+ * places, such as periodic events or on_load events.
+ */
+
+
+LUA_FUNCTION_DEF(worker, get_name);
+LUA_FUNCTION_DEF(worker, get_stat);
+LUA_FUNCTION_DEF(worker, get_index);
+LUA_FUNCTION_DEF(worker, get_count);
+LUA_FUNCTION_DEF(worker, get_pid);
+LUA_FUNCTION_DEF(worker, is_scanner);
+LUA_FUNCTION_DEF(worker, is_primary_controller);
+LUA_FUNCTION_DEF(worker, spawn_process);
+LUA_FUNCTION_DEF(worker, get_mem_stats);
+LUA_FUNCTION_DEF(worker, add_control_handler);
+
+const luaL_reg worker_reg[] = {
+ LUA_INTERFACE_DEF(worker, get_name),
+ {"get_type", lua_worker_get_name},
+ LUA_INTERFACE_DEF(worker, get_stat),
+ LUA_INTERFACE_DEF(worker, get_index),
+ LUA_INTERFACE_DEF(worker, get_count),
+ LUA_INTERFACE_DEF(worker, get_pid),
+ LUA_INTERFACE_DEF(worker, spawn_process),
+ LUA_INTERFACE_DEF(worker, is_scanner),
+ LUA_INTERFACE_DEF(worker, is_primary_controller),
+ LUA_INTERFACE_DEF(worker, get_mem_stats),
+ LUA_INTERFACE_DEF(worker, add_control_handler),
+ {"__tostring", rspamd_lua_class_tostring},
+ {NULL, NULL}};
+
+static struct rspamd_worker *
+lua_check_worker(lua_State *L, gint pos)
+{
+ void *ud = rspamd_lua_check_udata(L, pos, "rspamd{worker}");
+ luaL_argcheck(L, ud != NULL, pos, "'worker' expected");
+ return ud ? *((struct rspamd_worker **) ud) : NULL;
+}
+
+static gint
+lua_worker_get_stat(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ rspamd_mempool_stat_t mem_st;
+ struct rspamd_stat *stat, stat_copy;
+ ucl_object_t *top, *sub;
+ gint i;
+ guint64 spam = 0, ham = 0;
+
+ memset(&mem_st, 0, sizeof(mem_st));
+ rspamd_mempool_stat(&mem_st);
+ memcpy(&stat_copy, w->srv->stat, sizeof(stat_copy));
+ stat = &stat_copy;
+ top = ucl_object_typed_new(UCL_OBJECT);
+ ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false);
+ if (stat->messages_scanned > 0) {
+ sub = ucl_object_typed_new(UCL_OBJECT);
+ for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ ucl_object_insert_key(sub,
+ ucl_object_fromint(stat->actions_stat[i]),
+ rspamd_action_to_str(i), 0, false);
+ if (i < METRIC_ACTION_GREYLIST) {
+ spam += stat->actions_stat[i];
+ }
+ else {
+ ham += stat->actions_stat[i];
+ }
+ }
+ ucl_object_insert_key(top, sub, "actions", 0, false);
+ }
+ else {
+ sub = ucl_object_typed_new(UCL_OBJECT);
+ for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ ucl_object_insert_key(sub,
+ 0,
+ rspamd_action_to_str(i), 0, false);
+ }
+ ucl_object_insert_key(top, sub, "actions", 0, false);
+ }
+ ucl_object_insert_key(top, ucl_object_fromint(spam), "spam_count", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromint(ham), "ham_count", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(stat->connections_count), "connections", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(stat->control_connections_count),
+ "control_connections", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.pools_allocated), "pools_allocated", 0,
+ false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.pools_freed), "pools_freed", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.bytes_allocated), "bytes_allocated", 0,
+ false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(
+ mem_st.chunks_allocated),
+ "chunks_allocated", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.shared_chunks_allocated),
+ "shared_chunks_allocated", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.chunks_freed), "chunks_freed", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(
+ mem_st.oversized_chunks),
+ "chunks_oversized", 0, false);
+
+ ucl_object_push_lua(L, top, true);
+ ucl_object_unref(top);
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_worker_get_name(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushstring(L, g_quark_to_string(w->type));
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_worker_get_index(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushinteger(L, w->index);
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_worker_get_count(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushinteger(L, w->cf->count);
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_worker_get_pid(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushinteger(L, w->pid);
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+
+static gint
+lua_worker_is_scanner(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushboolean(L, rspamd_worker_is_scanner(w));
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+static gint
+lua_worker_is_primary_controller(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+ lua_pushboolean(L, rspamd_worker_is_primary_controller(w));
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+struct rspamd_control_cbdata {
+ lua_State *L;
+ rspamd_mempool_t *pool;
+ struct rspamd_worker *w;
+ struct rspamd_config *cfg;
+ struct ev_loop *event_loop;
+ struct rspamd_async_session *session;
+ enum rspamd_control_type cmd;
+ gint cbref;
+ gint fd;
+};
+
+static gboolean
+lua_worker_control_fin_session(void *ud)
+{
+ struct rspamd_control_reply rep;
+ struct rspamd_control_cbdata *cbd = (struct rspamd_control_cbdata *) ud;
+ rspamd_mempool_t *pool;
+
+ pool = cbd->pool;
+
+ memset(&rep, 0, sizeof(rep));
+ rep.type = cbd->cmd;
+
+ if (write(cbd->fd, &rep, sizeof(rep)) != sizeof(rep)) {
+ msg_err_pool("cannot write reply to the control socket: %s",
+ strerror(errno));
+ }
+
+ return TRUE;
+}
+
+static void
+lua_worker_control_session_dtor(void *ud)
+{
+ struct rspamd_control_cbdata *cbd = (struct rspamd_control_cbdata *) ud;
+
+ rspamd_mempool_delete(cbd->pool);
+}
+
+static gboolean
+lua_worker_control_handler(struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker,
+ gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_async_session *session, **psession;
+ struct rspamd_control_cbdata *cbd = (struct rspamd_control_cbdata *) ud;
+ rspamd_mempool_t *pool;
+ lua_State *L;
+ gint err_idx, status;
+
+ L = cbd->L;
+ pool = cbd->pool;
+ session = rspamd_session_create(cbd->pool,
+ lua_worker_control_fin_session,
+ NULL,
+ lua_worker_control_session_dtor,
+ cbd);
+ cbd->session = session;
+ cbd->fd = fd;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
+ psession = lua_newuserdata(L, sizeof(*psession));
+ rspamd_lua_setclass(L, "rspamd{session}", -1);
+ *psession = session;
+
+ /* Command name */
+ lua_pushstring(L, rspamd_control_command_to_string(cmd->type));
+
+ /* Command's extras */
+ lua_newtable(L);
+
+ switch (cmd->type) {
+ case RSPAMD_CONTROL_CHILD_CHANGE:
+ lua_pushinteger(L, cmd->cmd.child_change.pid);
+ lua_setfield(L, -2, "pid");
+ switch (cmd->cmd.child_change.what) {
+ case rspamd_child_offline:
+ lua_pushstring(L, "offline");
+ lua_setfield(L, -2, "what");
+ break;
+ case rspamd_child_online:
+ lua_pushstring(L, "online");
+ lua_setfield(L, -2, "what");
+ break;
+ case rspamd_child_terminated:
+ lua_pushstring(L, "terminated");
+ lua_setfield(L, -2, "what");
+ status = cmd->cmd.child_change.additional;
+
+ if (WIFEXITED(status)) {
+ lua_pushinteger(L, WEXITSTATUS(status));
+ lua_setfield(L, -2, "exit_code");
+ }
+
+ if (WIFSIGNALED(status)) {
+ lua_pushinteger(L, WTERMSIG(status));
+ lua_setfield(L, -2, "signal");
+#ifdef WCOREDUMP
+ lua_pushboolean(L, WCOREDUMP(status));
+ lua_setfield(L, -2, "core");
+#endif
+ }
+ break;
+ }
+ break;
+ case RSPAMD_CONTROL_MONITORED_CHANGE:
+ lua_pushinteger(L, cmd->cmd.monitored_change.sender);
+ lua_setfield(L, -2, "sender");
+ lua_pushboolean(L, cmd->cmd.monitored_change.alive);
+ lua_setfield(L, -2, "alive");
+ lua_pushlstring(L, cmd->cmd.monitored_change.tag,
+ sizeof(cmd->cmd.monitored_change.tag));
+ lua_setfield(L, -2, "tag");
+ break;
+ case RSPAMD_CONTROL_HYPERSCAN_LOADED:
+ lua_pushstring(L, cmd->cmd.hs_loaded.cache_dir);
+ lua_setfield(L, -2, "cache_dir");
+ lua_pushboolean(L, cmd->cmd.hs_loaded.forced);
+ lua_setfield(L, -2, "forced");
+ break;
+ case RSPAMD_CONTROL_STAT:
+ case RSPAMD_CONTROL_RELOAD:
+ case RSPAMD_CONTROL_RERESOLVE:
+ case RSPAMD_CONTROL_RECOMPILE:
+ case RSPAMD_CONTROL_LOG_PIPE:
+ case RSPAMD_CONTROL_FUZZY_STAT:
+ case RSPAMD_CONTROL_FUZZY_SYNC:
+ default:
+ break;
+ }
+
+ if (lua_pcall(L, 3, 0, err_idx) != 0) {
+ msg_err_pool("cannot init lua parser script: %s", lua_tostring(L, -1));
+ lua_settop(L, err_idx - 1);
+
+ struct rspamd_control_reply rep;
+
+ memset(&rep, 0, sizeof(rep));
+ rep.type = cbd->cmd;
+ rep.reply.monitored_change.status = -1;
+
+ if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) {
+ msg_err_pool("cannot write reply to the control socket: %s",
+ strerror(errno));
+ }
+
+ rspamd_session_destroy(session);
+ }
+ else {
+ lua_settop(L, err_idx - 1);
+ rspamd_session_pending(session);
+ }
+
+ return TRUE;
+}
+
+static gint
+lua_worker_add_control_handler(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+ struct rspamd_config *cfg = lua_check_config(L, 2);
+ struct ev_loop *event_loop = lua_check_ev_base(L, 3);
+ const gchar *cmd_name = luaL_checkstring(L, 4);
+ enum rspamd_control_type cmd;
+ struct rspamd_control_cbdata *cbd;
+
+ if (w && cfg && event_loop && cmd_name && lua_isfunction(L, 5)) {
+ cmd = rspamd_control_command_from_string(cmd_name);
+
+ if (cmd == RSPAMD_CONTROL_MAX) {
+ return luaL_error(L, "invalid command type: %s", cmd_name);
+ }
+
+ rspamd_mempool_t *pool = rspamd_mempool_new(
+ rspamd_mempool_suggest_size(), "lua_control", 0);
+ cbd = rspamd_mempool_alloc0(pool, sizeof(*cbd));
+ cbd->pool = pool;
+ cbd->event_loop = event_loop;
+ cbd->w = w;
+ cbd->cfg = cfg;
+ cbd->cmd = cmd;
+ cbd->L = L;
+ /* Refcount callback */
+ lua_pushvalue(L, 5);
+ cbd->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ rspamd_control_worker_add_cmd_handler(w, cmd, lua_worker_control_handler,
+ cbd);
+ }
+ else {
+ return luaL_error(L, "invalid arguments, need worker, cfg, "
+ "ev_loop, cmd_name and callback function");
+ }
+
+ return 0;
+}
+
+#ifdef WITH_JEMALLOC
+static void
+lua_worker_jemalloc_stats_cb(void *ud, const char *msg)
+{
+ lua_State *L = (lua_State *) ud;
+
+ lua_pushstring(L, msg);
+}
+#endif
+
+static gint
+lua_worker_get_mem_stats(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+
+ if (w) {
+#ifdef WITH_JEMALLOC
+ malloc_stats_print(lua_worker_jemalloc_stats_cb, (void *) L, NULL);
+#else
+ lua_pushstring(L, "no stats, jemalloc support is required");
+#endif
+ }
+ else {
+ return luaL_error(L, "invalid arguments");
+ }
+
+ return 1;
+}
+
+struct rspamd_lua_process_cbdata {
+ gint sp[2];
+ gint func_cbref;
+ gint cb_cbref;
+ gboolean replied;
+ gboolean is_error;
+ pid_t cpid;
+ lua_State *L;
+ guint64 sz;
+ GString *io_buf;
+ GString *out_buf;
+ goffset out_pos;
+ struct rspamd_worker *wrk;
+ struct ev_loop *event_loop;
+ ev_io ev;
+};
+
+static void
+rspamd_lua_execute_lua_subprocess(lua_State *L,
+ struct rspamd_lua_process_cbdata *cbdata)
+{
+ gint err_idx, r;
+ guint64 wlen = 0;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+
+ if (lua_pcall(L, 0, 1, err_idx) != 0) {
+ const gchar *s = lua_tostring(L, -1);
+ gsize slen = strlen(s);
+
+ msg_err("call to subprocess failed: %s", s);
+ /* Indicate error */
+ wlen = (1ULL << 63u) + slen;
+
+ r = write(cbdata->sp[1], &wlen, sizeof(wlen));
+ if (r == -1) {
+ msg_err("write failed: %s", strerror(errno));
+ }
+
+ r = write(cbdata->sp[1], s, slen);
+ if (r == -1) {
+ msg_err("write failed: %s", strerror(errno));
+ }
+ }
+ else {
+ struct rspamd_lua_text *t = lua_check_text_or_string(L, -1);
+
+ if (t) {
+ wlen = t->len;
+ r = write(cbdata->sp[1], &wlen, sizeof(wlen));
+
+ if (r == -1) {
+ msg_err("write failed: %s", strerror(errno));
+ }
+
+ r = write(cbdata->sp[1], t->start, wlen);
+
+ if (r == -1) {
+ msg_err("write failed: %s", strerror(errno));
+ }
+ }
+ else {
+ msg_err("subprocess: invalid return value: %s",
+ lua_typename(L, lua_type(L, -1)));
+ }
+ }
+
+ lua_settop(L, err_idx - 1);
+}
+
+static void
+rspamd_lua_call_on_complete(lua_State *L,
+ struct rspamd_lua_process_cbdata *cbdata,
+ const gchar *err_msg,
+ const gchar *data, gsize datalen)
+{
+ gint err_idx;
+
+ lua_pushcfunction(L, &rspamd_lua_traceback);
+ err_idx = lua_gettop(L);
+
+ lua_rawgeti(L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+
+ if (err_msg) {
+ lua_pushstring(L, err_msg);
+ }
+ else {
+ lua_pushnil(L);
+ }
+
+ if (data) {
+ lua_pushlstring(L, data, datalen);
+ }
+ else {
+ lua_pushnil(L);
+ }
+
+ if (lua_pcall(L, 2, 0, err_idx) != 0) {
+ msg_err("call to on_complete script failed: %s",
+ lua_tostring(L, -1));
+ }
+
+ lua_settop(L, err_idx - 1);
+}
+
+static gboolean
+rspamd_lua_cld_handler(struct rspamd_worker_signal_handler *sigh, void *ud)
+{
+ struct rspamd_lua_process_cbdata *cbdata = ud;
+ struct rspamd_srv_command srv_cmd;
+ lua_State *L;
+ pid_t died;
+ gint res = 0;
+
+ /* Are we called by a correct children ? */
+ died = waitpid(cbdata->cpid, &res, WNOHANG);
+
+ if (died <= 0) {
+ /* Wait more */
+ return TRUE;
+ }
+
+ L = cbdata->L;
+ msg_info("handled SIGCHLD from %P", cbdata->cpid);
+
+ if (!cbdata->replied) {
+ /* We still need to call on_complete callback */
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ "Worker has died without reply", NULL, 0);
+ }
+
+ /* Free structures */
+ close(cbdata->sp[0]);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+ g_string_free(cbdata->io_buf, TRUE);
+
+ if (cbdata->out_buf) {
+ g_string_free(cbdata->out_buf, TRUE);
+ }
+
+ /* Notify main */
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_ON_FORK;
+ srv_cmd.cmd.on_fork.state = child_dead;
+ srv_cmd.cmd.on_fork.cpid = cbdata->cpid;
+ srv_cmd.cmd.on_fork.ppid = getpid();
+ rspamd_srv_send_command(cbdata->wrk, cbdata->event_loop, &srv_cmd, -1,
+ NULL, NULL);
+ g_free(cbdata);
+
+ /* We are done with this SIGCHLD */
+ return FALSE;
+}
+
+static void
+rspamd_lua_subprocess_io(EV_P_ ev_io *w, int revents)
+{
+ struct rspamd_lua_process_cbdata *cbdata =
+ (struct rspamd_lua_process_cbdata *) w->data;
+ gssize r;
+
+ if (cbdata->sz == (guint64) -1) {
+ guint64 sz;
+
+ /* We read size of reply + flags first */
+ r = read(cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+ sizeof(guint64) - cbdata->io_buf->len);
+
+ if (r == 0) {
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ "Unexpected EOF", NULL, 0);
+ cbdata->replied = TRUE;
+ kill(cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ else if (r == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return;
+ }
+ else {
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ strerror(errno), NULL, 0);
+ cbdata->replied = TRUE;
+ kill(cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ }
+
+ cbdata->io_buf->len += r;
+
+ if (cbdata->io_buf->len == sizeof(guint64)) {
+ memcpy((guchar *) &sz, cbdata->io_buf->str, sizeof(sz));
+
+ if (sz & (1ULL << 63)) {
+ cbdata->is_error = TRUE;
+ sz &= ~(1ULL << 63);
+ }
+
+ cbdata->io_buf->len = 0;
+ cbdata->sz = sz;
+ g_string_set_size(cbdata->io_buf, sz + 1);
+ cbdata->io_buf->len = 0;
+ }
+ }
+ else {
+ /* Read data */
+ r = read(cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+ cbdata->sz - cbdata->io_buf->len);
+
+ if (r == 0) {
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ "Unexpected EOF", NULL, 0);
+ cbdata->replied = TRUE;
+ kill(cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ else if (r == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return;
+ }
+ else {
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ strerror(errno), NULL, 0);
+ cbdata->replied = TRUE;
+ kill(cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ }
+
+ cbdata->io_buf->len += r;
+
+ if (cbdata->io_buf->len == cbdata->sz) {
+ gchar rep[4];
+
+ ev_io_stop(cbdata->event_loop, &cbdata->ev);
+ /* Finished reading data */
+ if (cbdata->is_error) {
+ cbdata->io_buf->str[cbdata->io_buf->len] = '\0';
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ cbdata->io_buf->str, NULL, 0);
+ }
+ else {
+ rspamd_lua_call_on_complete(cbdata->L, cbdata,
+ NULL, cbdata->io_buf->str, cbdata->io_buf->len);
+ }
+
+ cbdata->replied = TRUE;
+
+ /* Write reply to the child */
+ rspamd_socket_blocking(cbdata->sp[0]);
+ memset(rep, 0, sizeof(rep));
+ (void) !write(cbdata->sp[0], rep, sizeof(rep));
+ }
+ }
+}
+
+static gint
+lua_worker_spawn_process(lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker(L, 1);
+ struct rspamd_lua_process_cbdata *cbdata;
+ struct rspamd_abstract_worker_ctx *actx;
+ struct rspamd_srv_command srv_cmd;
+ const gchar *cmdline = NULL, *input = NULL, *proctitle = NULL;
+ gsize inputlen = 0;
+ pid_t pid;
+ GError *err = NULL;
+ gint func_cbref, cb_cbref;
+
+ if (!rspamd_lua_parse_table_arguments(L, 2, &err,
+ RSPAMD_LUA_PARSE_ARGUMENTS_DEFAULT,
+ "func=F;exec=S;stdin=V;*on_complete=F;proctitle=S", &func_cbref,
+ &cmdline, &inputlen, &input, &cb_cbref, &proctitle)) {
+ msg_err("cannot get parameters list: %e", err);
+
+ if (err) {
+ g_error_free(err);
+ }
+
+ return 0;
+ }
+
+ cbdata = g_malloc0(sizeof(*cbdata));
+ cbdata->cb_cbref = cb_cbref;
+ cbdata->func_cbref = func_cbref;
+
+ if (input) {
+ cbdata->out_buf = g_string_new_len(input, inputlen);
+ cbdata->out_pos = 0;
+ }
+
+ if (rspamd_socketpair(cbdata->sp, SOCK_STREAM) == -1) {
+ msg_err("cannot spawn socketpair: %s", strerror(errno));
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+ g_free(cbdata);
+
+ return 0;
+ }
+
+ actx = w->ctx;
+ cbdata->wrk = w;
+ cbdata->L = L;
+ cbdata->event_loop = actx->event_loop;
+ cbdata->sz = (guint64) -1;
+
+ pid = fork();
+
+ if (pid == -1) {
+ msg_err("cannot spawn process: %s", strerror(errno));
+ close(cbdata->sp[0]);
+ close(cbdata->sp[1]);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref(L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+ g_free(cbdata);
+
+ return 0;
+ }
+ else if (pid == 0) {
+ /* Child */
+ gint rc;
+ gchar inbuf[4];
+
+ rspamd_log_on_fork(w->cf->type, w->srv->cfg, w->srv->logger);
+ rc = ottery_init(w->srv->cfg->libs_ctx->ottery_cfg);
+
+ if (rc != OTTERY_ERR_NONE) {
+ msg_err("cannot initialize PRNG: %d", rc);
+ abort();
+ }
+ rspamd_random_seed_fast();
+#ifdef HAVE_EVUTIL_RNG_INIT
+ evutil_secure_rng_init();
+#endif
+
+ close(cbdata->sp[0]);
+ /* Here we assume that we can block on writing results */
+ rspamd_socket_blocking(cbdata->sp[1]);
+ g_hash_table_remove_all(w->signal_events);
+ ev_loop_destroy(cbdata->event_loop);
+
+ if (proctitle) {
+ rspamd_setproctitle("lua process: %s", proctitle);
+ }
+ else {
+ rspamd_setproctitle("lua process: unnamed");
+ }
+
+ cbdata->event_loop = ev_loop_new(EVFLAG_SIGNALFD);
+ rspamd_worker_unblock_signals();
+ rspamd_lua_execute_lua_subprocess(L, cbdata);
+
+ /* Wait for parent to reply and exit */
+ rc = read(cbdata->sp[1], inbuf, sizeof(inbuf));
+
+ if (rc >= sizeof(inbuf) &&
+ memcmp(inbuf, "\0\0\0\0", sizeof(inbuf)) == 0) {
+ exit(EXIT_SUCCESS);
+ }
+ else {
+ msg_err("got invalid reply from parent");
+
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ cbdata->cpid = pid;
+ cbdata->io_buf = g_string_sized_new(8);
+ /* Notify main */
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_ON_FORK;
+ srv_cmd.cmd.on_fork.state = child_create;
+ srv_cmd.cmd.on_fork.cpid = pid;
+ srv_cmd.cmd.on_fork.ppid = getpid();
+ rspamd_srv_send_command(w, cbdata->event_loop, &srv_cmd, -1, NULL, NULL);
+
+ close(cbdata->sp[1]);
+ rspamd_socket_nonblocking(cbdata->sp[0]);
+ /* Parent */
+ rspamd_worker_set_signal_handler(SIGCHLD, w, cbdata->event_loop,
+ rspamd_lua_cld_handler,
+ cbdata);
+
+ /* Add result pipe waiting */
+ ev_io_init(&cbdata->ev, rspamd_lua_subprocess_io, cbdata->sp[0], EV_READ);
+ cbdata->ev.data = cbdata;
+ ev_io_start(cbdata->event_loop, &cbdata->ev);
+
+ return 0;
+}
+
+void luaopen_worker(lua_State *L)
+{
+ rspamd_lua_new_class(L, "rspamd{worker}", worker_reg);
+}