summaryrefslogtreecommitdiffstats
path: root/src/utils/knotd/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils/knotd/main.c')
-rw-r--r--src/utils/knotd/main.c256
1 files changed, 228 insertions, 28 deletions
diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c
index d4ebd53..e863296 100644
--- a/src/utils/knotd/main.c
+++ b/src/utils/knotd/main.c
@@ -38,15 +38,39 @@
#include "knot/conf/conf.h"
#include "knot/conf/migration.h"
#include "knot/conf/module.h"
+#include "knot/common/dbus.h"
#include "knot/common/log.h"
#include "knot/common/process.h"
#include "knot/common/stats.h"
#include "knot/common/systemd.h"
#include "knot/server/server.h"
#include "knot/server/tcp-handler.h"
+#include "utils/common/params.h"
#define PROGRAM_NAME "knotd"
+typedef enum {
+ CONCURRENT_EMPTY = 0, // fresh cctx without a thread.
+ CONCURRENT_ASSIGNED, // cctx assigned to process a command.
+ CONCURRENT_RUNNING, // ctl command is being processed in the thread.
+ CONCURRENT_IDLE, // command has been processed, waiting for a new one.
+ CONCURRENT_KILLED, // cctx cleanup has started.
+ CONCURRENT_FINISHED, // after having been killed, the thread is being joined.
+} concurrent_ctl_state_t;
+
+typedef struct {
+ concurrent_ctl_state_t state;
+ pthread_mutex_t mutex; // Protects .state.
+ pthread_cond_t cond;
+ knot_ctl_t *ctl;
+ server_t *server;
+ pthread_t thread;
+ sigset_t sigmask;
+ int ret;
+ int thread_idx;
+ bool exclusive;
+} concurrent_ctl_ctx_t;
+
/* Signal flags. */
static volatile bool sig_req_stop = false;
static volatile bool sig_req_reload = false;
@@ -161,13 +185,14 @@ static void setup_signals(void)
sigdelset(&all, SIGBUS);
sigdelset(&all, SIGFPE);
sigdelset(&all, SIGSEGV);
- pthread_sigmask(SIG_SETMASK, &all, NULL);
/* Setup handlers. */
struct sigaction action = { .sa_handler = handle_signal };
for (const struct signal *s = SIGNALS; s->signum > 0; s++) {
sigaction(s->signum, &action, NULL);
}
+
+ pthread_sigmask(SIG_SETMASK, &all, NULL);
}
/*! \brief Unblock server control signals. */
@@ -185,6 +210,24 @@ static void enable_signals(void)
pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
}
+/*! \brief Create a control thread with correct signals setting. */
+static void create_thread_sigmask(pthread_t *thr, void *(*fcn)(void*), void *ctx,
+ sigset_t *out_mask)
+{
+ /* Block all blockable signals. */
+ sigset_t mask;
+ sigfillset(&mask);
+ sigdelset(&mask, SIGBUS);
+ sigdelset(&mask, SIGFPE);
+ sigdelset(&mask, SIGILL);
+ sigdelset(&mask, SIGSEGV);
+ pthread_sigmask(SIG_SETMASK, &mask, out_mask);
+
+ pthread_create(thr, NULL, fcn, ctx);
+
+ pthread_sigmask(SIG_SETMASK, out_mask, NULL);
+}
+
/*! \brief Drop POSIX 1003.1e capabilities. */
static void drop_capabilities(void)
{
@@ -224,6 +267,7 @@ static void check_loaded(server_t *server)
return;
}
+ rcu_read_lock();
knot_zonedb_iter_t *it = knot_zonedb_iter_begin(server->zone_db);
while (!knot_zonedb_iter_finished(it)) {
zone_t *zone = (zone_t *)knot_zonedb_iter_val(it);
@@ -234,9 +278,162 @@ static void check_loaded(server_t *server)
knot_zonedb_iter_next(it);
}
knot_zonedb_iter_free(it);
+ rcu_read_unlock();
finished = true;
- systemd_emit_running(true);
+ dbus_emit_running(true);
+}
+
+static void *ctl_process_thread(void *arg);
+
+/*!
+ * Try to find an empty ctl processing context and if successful,
+ * prepare to lauch the incomming command processing in it.
+ *
+ * \param[in] concurrent_ctxs Configured concurrent control contexts.
+ * \param[in] n_ctxs Number of configured concurrent control contexts.
+ * \param[in] ctl Control context.
+ *
+ * \return Assigned concurrent control context, or NULL.
+ */
+
+static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs,
+ size_t n_ctxs, knot_ctl_t *ctl)
+{
+ concurrent_ctl_ctx_t *res = NULL;
+ for (size_t i = 0; i < n_ctxs && res == NULL; i++) {
+ concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i];
+ pthread_mutex_lock(&cctx->mutex);
+ if (cctx->exclusive) {
+ while (cctx->state != CONCURRENT_IDLE) {
+ pthread_cond_wait(&cctx->cond, &cctx->mutex);
+ }
+ knot_ctl_free(cctx->ctl);
+ cctx->ctl = knot_ctl_clone(ctl);
+ if (cctx->ctl == NULL) {
+ cctx->exclusive = false;
+ pthread_mutex_unlock(&cctx->mutex);
+ break;
+ }
+ cctx->state = CONCURRENT_ASSIGNED;
+ res = cctx;
+ pthread_cond_broadcast(&cctx->cond);
+ }
+ pthread_mutex_unlock(&cctx->mutex);
+ }
+ for (size_t i = 0; i < n_ctxs && res == NULL; i++) {
+ concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i];
+ pthread_mutex_lock(&cctx->mutex);
+ switch (cctx->state) {
+ case CONCURRENT_EMPTY:
+ create_thread_sigmask(&cctx->thread, ctl_process_thread, cctx, &cctx->sigmask);
+ break;
+ case CONCURRENT_IDLE:
+ knot_ctl_free(cctx->ctl);
+ pthread_cond_broadcast(&cctx->cond);
+ break;
+ default:
+ pthread_mutex_unlock(&cctx->mutex);
+ continue;
+ }
+ cctx->ctl = knot_ctl_clone(ctl);
+ if (cctx->ctl != NULL) {
+ cctx->state = CONCURRENT_ASSIGNED;
+ res = cctx;
+ }
+ pthread_mutex_unlock(&cctx->mutex);
+ }
+ return res;
+}
+
+static void init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server)
+{
+ for (size_t i = 0; i < n_ctxs; i++) {
+ concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i];
+ pthread_mutex_init(&cctx->mutex, NULL);
+ pthread_cond_init(&cctx->cond, NULL);
+ cctx->server = server;
+ cctx->thread_idx = i + 1;
+ }
+}
+
+static int cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs)
+{
+ int ret = KNOT_EOK;
+ for (size_t i = 0; i < n_ctxs; i++) {
+ concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i];
+ pthread_mutex_lock(&cctx->mutex);
+ if (cctx->state == CONCURRENT_IDLE) {
+ knot_ctl_free(cctx->ctl);
+ cctx->ctl = NULL;
+ if (cctx->ret == KNOT_CTL_ESTOP) {
+ ret = cctx->ret;
+ }
+ }
+ pthread_mutex_unlock(&cctx->mutex);
+ }
+ return ret;
+}
+
+static void finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs)
+{
+ for (size_t i = 0; i < n_ctxs; i++) {
+ concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i];
+ pthread_mutex_lock(&cctx->mutex);
+ if (cctx->state == CONCURRENT_EMPTY) {
+ pthread_mutex_unlock(&cctx->mutex);
+ pthread_mutex_destroy(&cctx->mutex);
+ pthread_cond_destroy(&cctx->cond);
+ continue;
+ }
+
+ cctx->state = CONCURRENT_KILLED;
+ pthread_cond_broadcast(&cctx->cond);
+ pthread_mutex_unlock(&cctx->mutex);
+ (void)pthread_join(cctx->thread, NULL);
+
+ assert(cctx->state == CONCURRENT_FINISHED);
+ knot_ctl_free(cctx->ctl);
+ pthread_mutex_destroy(&cctx->mutex);
+ pthread_cond_destroy(&cctx->cond);
+ }
+}
+
+static void *ctl_process_thread(void *arg)
+{
+ concurrent_ctl_ctx_t *ctx = arg;
+ rcu_register_thread();
+ setup_signals(); // in fact, this blocks common signals so that they
+ // arrive to main thread instead of this one
+
+ pthread_mutex_lock(&ctx->mutex);
+ while (ctx->state != CONCURRENT_KILLED) {
+ if (ctx->state != CONCURRENT_ASSIGNED) {
+ pthread_cond_wait(&ctx->cond, &ctx->mutex);
+ continue;
+ }
+ ctx->state = CONCURRENT_RUNNING;
+ bool exclusive = ctx->exclusive;
+ pthread_mutex_unlock(&ctx->mutex);
+
+ // Not IDLE, ctx can be read without locking.
+ int ret = ctl_process(ctx->ctl, ctx->server, ctx->thread_idx, &exclusive);
+
+ pthread_mutex_lock(&ctx->mutex);
+ ctx->ret = ret;
+ ctx->exclusive = exclusive;
+ if (ctx->state == CONCURRENT_RUNNING) { // not KILLED
+ ctx->state = CONCURRENT_IDLE;
+ pthread_cond_broadcast(&ctx->cond);
+ }
+ }
+
+ knot_ctl_close(ctx->ctl);
+
+ ctx->state = CONCURRENT_FINISHED;
+ pthread_mutex_unlock(&ctx->mutex);
+ rcu_unregister_thread();
+ return NULL;
}
/*! \brief Event loop listening for signals and remote commands. */
@@ -274,7 +471,7 @@ static void event_loop(server_t *server, const char *socket, bool daemonize,
/* Bind the control socket. */
uint16_t backlog = conf_get_int(conf(), C_CTL, C_BACKLOG);
- int ret = knot_ctl_bind2(ctl, listen, backlog);
+ int ret = knot_ctl_bind(ctl, listen, backlog);
if (ret != KNOT_EOK) {
knot_ctl_free(ctl);
log_fatal("control, failed to bind socket '%s' (%s)",
@@ -286,6 +483,10 @@ static void event_loop(server_t *server, const char *socket, bool daemonize,
enable_signals();
+ concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 };
+ init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, server);
+ bool main_thread_exclusive = false;
+
/* Notify systemd about successful start. */
systemd_ready_notify();
if (daemonize) {
@@ -299,15 +500,19 @@ static void event_loop(server_t *server, const char *socket, bool daemonize,
/* Interrupts. */
if (sig_req_reload && !sig_req_stop) {
sig_req_reload = false;
+ pthread_rwlock_wrlock(&server->ctl_lock);
server_reload(server, RELOAD_FULL);
+ pthread_rwlock_unlock(&server->ctl_lock);
}
if (sig_req_zones_reload && !sig_req_stop) {
sig_req_zones_reload = false;
reload_t mode = server->catalog_upd_signal ? RELOAD_CATALOG : RELOAD_ZONES;
+ pthread_rwlock_wrlock(&server->ctl_lock);
server->catalog_upd_signal = false;
server_update_zones(conf(), server, mode);
+ pthread_rwlock_unlock(&server->ctl_lock);
}
- if (sig_req_stop) {
+ if (sig_req_stop || cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) {
break;
}
@@ -325,15 +530,20 @@ static void event_loop(server_t *server, const char *socket, bool daemonize,
continue;
}
- ret = ctl_process(ctl, server);
- knot_ctl_close(ctl);
- if (ret == KNOT_CTL_ESTOP) {
- break;
+ if (main_thread_exclusive ||
+ find_free_ctx(concurrent_ctxs, CTL_MAX_CONCURRENT, ctl) == NULL) {
+ ret = ctl_process(ctl, server, 0, &main_thread_exclusive);
+ knot_ctl_close(ctl);
+ if (ret == KNOT_CTL_ESTOP) {
+ break;
+ }
}
}
+ finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT);
+
if (conf()->cache.srv_dbus_event & DBUS_EVENT_RUNNING) {
- systemd_emit_running(false);
+ dbus_emit_running(false);
}
/* Unbind the control socket. */
@@ -363,11 +573,6 @@ static void print_help(void)
CONF_MAPSIZE, RUN_DIR "/knot.sock");
}
-static void print_version(void)
-{
- printf("%s (Knot DNS), version %s\n", PROGRAM_NAME, PACKAGE_VERSION);
-}
-
static int set_config(const char *confdb, const char *config, size_t max_conf_size)
{
if (config != NULL && confdb != NULL) {
@@ -440,7 +645,7 @@ int main(int argc, char **argv)
{ "daemonize", optional_argument, NULL, 'd' },
{ "verbose", no_argument, NULL, 'v' },
{ "help", no_argument, NULL, 'h' },
- { "version", no_argument, NULL, 'V' },
+ { "version", optional_argument, NULL, 'V' },
{ NULL }
};
@@ -449,7 +654,7 @@ int main(int argc, char **argv)
/* Parse command line arguments. */
int opt = 0;
- while ((opt = getopt_long(argc, argv, "c:C:m:s:dvhV", opts, NULL)) != -1) {
+ while ((opt = getopt_long(argc, argv, "c:C:m:s:dvhV::", opts, NULL)) != -1) {
switch (opt) {
case 'c':
config = optarg;
@@ -481,7 +686,7 @@ int main(int argc, char **argv)
print_help();
return EXIT_SUCCESS;
case 'V':
- print_version();
+ print_version(PROGRAM_NAME, optarg != NULL);
return EXIT_SUCCESS;
default:
print_help();
@@ -570,14 +775,9 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
- if (conf()->cache.srv_dbus_event != DBUS_EVENT_NONE) {
- ret = systemd_dbus_open();
- if (ret != KNOT_EOK) {
- log_error("d-bus: failed to open system bus (%s)",
- knot_strerror(ret));
- } else {
- log_info("d-bus: connected to system bus");
- }
+ /* Connect to the system D-bus. */
+ if (conf()->cache.srv_dbus_event != DBUS_EVENT_NONE &&
+ dbus_open() == KNOT_EOK) {
int64_t delay = conf_get_int(conf(), C_SRV, C_DBUS_INIT_DELAY);
sleep(delay);
}
@@ -595,7 +795,7 @@ int main(int argc, char **argv)
server_wait(&server);
server_deinit(&server);
conf_free(conf());
- systemd_dbus_close();
+ dbus_close();
log_close();
dnssec_crypto_cleanup();
return EXIT_FAILURE;
@@ -636,7 +836,7 @@ int main(int argc, char **argv)
rcu_unregister_thread();
pid_cleanup();
conf_free(conf());
- systemd_dbus_close();
+ dbus_close();
log_close();
dnssec_crypto_cleanup();
return EXIT_FAILURE;
@@ -660,7 +860,7 @@ int main(int argc, char **argv)
/* Unhook from RCU. */
rcu_unregister_thread();
- systemd_dbus_close();
+ dbus_close();
log_info("shutting down");
log_close();