diff options
Diffstat (limited to 'src/utils/knotd/main.c')
-rw-r--r-- | src/utils/knotd/main.c | 256 |
1 files changed, 228 insertions, 28 deletions
diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c index d4ebd53..e863296 100644 --- a/src/utils/knotd/main.c +++ b/src/utils/knotd/main.c @@ -38,15 +38,39 @@ #include "knot/conf/conf.h" #include "knot/conf/migration.h" #include "knot/conf/module.h" +#include "knot/common/dbus.h" #include "knot/common/log.h" #include "knot/common/process.h" #include "knot/common/stats.h" #include "knot/common/systemd.h" #include "knot/server/server.h" #include "knot/server/tcp-handler.h" +#include "utils/common/params.h" #define PROGRAM_NAME "knotd" +typedef enum { + CONCURRENT_EMPTY = 0, // fresh cctx without a thread. + CONCURRENT_ASSIGNED, // cctx assigned to process a command. + CONCURRENT_RUNNING, // ctl command is being processed in the thread. + CONCURRENT_IDLE, // command has been processed, waiting for a new one. + CONCURRENT_KILLED, // cctx cleanup has started. + CONCURRENT_FINISHED, // after having been killed, the thread is being joined. +} concurrent_ctl_state_t; + +typedef struct { + concurrent_ctl_state_t state; + pthread_mutex_t mutex; // Protects .state. + pthread_cond_t cond; + knot_ctl_t *ctl; + server_t *server; + pthread_t thread; + sigset_t sigmask; + int ret; + int thread_idx; + bool exclusive; +} concurrent_ctl_ctx_t; + /* Signal flags. */ static volatile bool sig_req_stop = false; static volatile bool sig_req_reload = false; @@ -161,13 +185,14 @@ static void setup_signals(void) sigdelset(&all, SIGBUS); sigdelset(&all, SIGFPE); sigdelset(&all, SIGSEGV); - pthread_sigmask(SIG_SETMASK, &all, NULL); /* Setup handlers. */ struct sigaction action = { .sa_handler = handle_signal }; for (const struct signal *s = SIGNALS; s->signum > 0; s++) { sigaction(s->signum, &action, NULL); } + + pthread_sigmask(SIG_SETMASK, &all, NULL); } /*! \brief Unblock server control signals. */ @@ -185,6 +210,24 @@ static void enable_signals(void) pthread_sigmask(SIG_UNBLOCK, &mask, NULL); } +/*! \brief Create a control thread with correct signals setting. */ +static void create_thread_sigmask(pthread_t *thr, void *(*fcn)(void*), void *ctx, + sigset_t *out_mask) +{ + /* Block all blockable signals. */ + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, SIGBUS); + sigdelset(&mask, SIGFPE); + sigdelset(&mask, SIGILL); + sigdelset(&mask, SIGSEGV); + pthread_sigmask(SIG_SETMASK, &mask, out_mask); + + pthread_create(thr, NULL, fcn, ctx); + + pthread_sigmask(SIG_SETMASK, out_mask, NULL); +} + /*! \brief Drop POSIX 1003.1e capabilities. */ static void drop_capabilities(void) { @@ -224,6 +267,7 @@ static void check_loaded(server_t *server) return; } + rcu_read_lock(); knot_zonedb_iter_t *it = knot_zonedb_iter_begin(server->zone_db); while (!knot_zonedb_iter_finished(it)) { zone_t *zone = (zone_t *)knot_zonedb_iter_val(it); @@ -234,9 +278,162 @@ static void check_loaded(server_t *server) knot_zonedb_iter_next(it); } knot_zonedb_iter_free(it); + rcu_read_unlock(); finished = true; - systemd_emit_running(true); + dbus_emit_running(true); +} + +static void *ctl_process_thread(void *arg); + +/*! + * Try to find an empty ctl processing context and if successful, + * prepare to lauch the incomming command processing in it. + * + * \param[in] concurrent_ctxs Configured concurrent control contexts. + * \param[in] n_ctxs Number of configured concurrent control contexts. + * \param[in] ctl Control context. + * + * \return Assigned concurrent control context, or NULL. + */ + +static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs, + size_t n_ctxs, knot_ctl_t *ctl) +{ + concurrent_ctl_ctx_t *res = NULL; + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->exclusive) { + while (cctx->state != CONCURRENT_IDLE) { + pthread_cond_wait(&cctx->cond, &cctx->mutex); + } + knot_ctl_free(cctx->ctl); + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl == NULL) { + cctx->exclusive = false; + pthread_mutex_unlock(&cctx->mutex); + break; + } + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + pthread_cond_broadcast(&cctx->cond); + } + pthread_mutex_unlock(&cctx->mutex); + } + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + switch (cctx->state) { + case CONCURRENT_EMPTY: + create_thread_sigmask(&cctx->thread, ctl_process_thread, cctx, &cctx->sigmask); + break; + case CONCURRENT_IDLE: + knot_ctl_free(cctx->ctl); + pthread_cond_broadcast(&cctx->cond); + break; + default: + pthread_mutex_unlock(&cctx->mutex); + continue; + } + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl != NULL) { + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + } + pthread_mutex_unlock(&cctx->mutex); + } + return res; +} + +static void init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_init(&cctx->mutex, NULL); + pthread_cond_init(&cctx->cond, NULL); + cctx->server = server; + cctx->thread_idx = i + 1; + } +} + +static int cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +{ + int ret = KNOT_EOK; + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_IDLE) { + knot_ctl_free(cctx->ctl); + cctx->ctl = NULL; + if (cctx->ret == KNOT_CTL_ESTOP) { + ret = cctx->ret; + } + } + pthread_mutex_unlock(&cctx->mutex); + } + return ret; +} + +static void finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_EMPTY) { + pthread_mutex_unlock(&cctx->mutex); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + continue; + } + + cctx->state = CONCURRENT_KILLED; + pthread_cond_broadcast(&cctx->cond); + pthread_mutex_unlock(&cctx->mutex); + (void)pthread_join(cctx->thread, NULL); + + assert(cctx->state == CONCURRENT_FINISHED); + knot_ctl_free(cctx->ctl); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + } +} + +static void *ctl_process_thread(void *arg) +{ + concurrent_ctl_ctx_t *ctx = arg; + rcu_register_thread(); + setup_signals(); // in fact, this blocks common signals so that they + // arrive to main thread instead of this one + + pthread_mutex_lock(&ctx->mutex); + while (ctx->state != CONCURRENT_KILLED) { + if (ctx->state != CONCURRENT_ASSIGNED) { + pthread_cond_wait(&ctx->cond, &ctx->mutex); + continue; + } + ctx->state = CONCURRENT_RUNNING; + bool exclusive = ctx->exclusive; + pthread_mutex_unlock(&ctx->mutex); + + // Not IDLE, ctx can be read without locking. + int ret = ctl_process(ctx->ctl, ctx->server, ctx->thread_idx, &exclusive); + + pthread_mutex_lock(&ctx->mutex); + ctx->ret = ret; + ctx->exclusive = exclusive; + if (ctx->state == CONCURRENT_RUNNING) { // not KILLED + ctx->state = CONCURRENT_IDLE; + pthread_cond_broadcast(&ctx->cond); + } + } + + knot_ctl_close(ctx->ctl); + + ctx->state = CONCURRENT_FINISHED; + pthread_mutex_unlock(&ctx->mutex); + rcu_unregister_thread(); + return NULL; } /*! \brief Event loop listening for signals and remote commands. */ @@ -274,7 +471,7 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, /* Bind the control socket. */ uint16_t backlog = conf_get_int(conf(), C_CTL, C_BACKLOG); - int ret = knot_ctl_bind2(ctl, listen, backlog); + int ret = knot_ctl_bind(ctl, listen, backlog); if (ret != KNOT_EOK) { knot_ctl_free(ctl); log_fatal("control, failed to bind socket '%s' (%s)", @@ -286,6 +483,10 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, enable_signals(); + concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 }; + init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, server); + bool main_thread_exclusive = false; + /* Notify systemd about successful start. */ systemd_ready_notify(); if (daemonize) { @@ -299,15 +500,19 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, /* Interrupts. */ if (sig_req_reload && !sig_req_stop) { sig_req_reload = false; + pthread_rwlock_wrlock(&server->ctl_lock); server_reload(server, RELOAD_FULL); + pthread_rwlock_unlock(&server->ctl_lock); } if (sig_req_zones_reload && !sig_req_stop) { sig_req_zones_reload = false; reload_t mode = server->catalog_upd_signal ? RELOAD_CATALOG : RELOAD_ZONES; + pthread_rwlock_wrlock(&server->ctl_lock); server->catalog_upd_signal = false; server_update_zones(conf(), server, mode); + pthread_rwlock_unlock(&server->ctl_lock); } - if (sig_req_stop) { + if (sig_req_stop || cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { break; } @@ -325,15 +530,20 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, continue; } - ret = ctl_process(ctl, server); - knot_ctl_close(ctl); - if (ret == KNOT_CTL_ESTOP) { - break; + if (main_thread_exclusive || + find_free_ctx(concurrent_ctxs, CTL_MAX_CONCURRENT, ctl) == NULL) { + ret = ctl_process(ctl, server, 0, &main_thread_exclusive); + knot_ctl_close(ctl); + if (ret == KNOT_CTL_ESTOP) { + break; + } } } + finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); + if (conf()->cache.srv_dbus_event & DBUS_EVENT_RUNNING) { - systemd_emit_running(false); + dbus_emit_running(false); } /* Unbind the control socket. */ @@ -363,11 +573,6 @@ static void print_help(void) CONF_MAPSIZE, RUN_DIR "/knot.sock"); } -static void print_version(void) -{ - printf("%s (Knot DNS), version %s\n", PROGRAM_NAME, PACKAGE_VERSION); -} - static int set_config(const char *confdb, const char *config, size_t max_conf_size) { if (config != NULL && confdb != NULL) { @@ -440,7 +645,7 @@ int main(int argc, char **argv) { "daemonize", optional_argument, NULL, 'd' }, { "verbose", no_argument, NULL, 'v' }, { "help", no_argument, NULL, 'h' }, - { "version", no_argument, NULL, 'V' }, + { "version", optional_argument, NULL, 'V' }, { NULL } }; @@ -449,7 +654,7 @@ int main(int argc, char **argv) /* Parse command line arguments. */ int opt = 0; - while ((opt = getopt_long(argc, argv, "c:C:m:s:dvhV", opts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "c:C:m:s:dvhV::", opts, NULL)) != -1) { switch (opt) { case 'c': config = optarg; @@ -481,7 +686,7 @@ int main(int argc, char **argv) print_help(); return EXIT_SUCCESS; case 'V': - print_version(); + print_version(PROGRAM_NAME, optarg != NULL); return EXIT_SUCCESS; default: print_help(); @@ -570,14 +775,9 @@ int main(int argc, char **argv) return EXIT_FAILURE; } - if (conf()->cache.srv_dbus_event != DBUS_EVENT_NONE) { - ret = systemd_dbus_open(); - if (ret != KNOT_EOK) { - log_error("d-bus: failed to open system bus (%s)", - knot_strerror(ret)); - } else { - log_info("d-bus: connected to system bus"); - } + /* Connect to the system D-bus. */ + if (conf()->cache.srv_dbus_event != DBUS_EVENT_NONE && + dbus_open() == KNOT_EOK) { int64_t delay = conf_get_int(conf(), C_SRV, C_DBUS_INIT_DELAY); sleep(delay); } @@ -595,7 +795,7 @@ int main(int argc, char **argv) server_wait(&server); server_deinit(&server); conf_free(conf()); - systemd_dbus_close(); + dbus_close(); log_close(); dnssec_crypto_cleanup(); return EXIT_FAILURE; @@ -636,7 +836,7 @@ int main(int argc, char **argv) rcu_unregister_thread(); pid_cleanup(); conf_free(conf()); - systemd_dbus_close(); + dbus_close(); log_close(); dnssec_crypto_cleanup(); return EXIT_FAILURE; @@ -660,7 +860,7 @@ int main(int argc, char **argv) /* Unhook from RCU. */ rcu_unregister_thread(); - systemd_dbus_close(); + dbus_close(); log_info("shutting down"); log_close(); |