/* Copyright (C) 2023 CZ.NIC, z.s.p.o. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #define __APPLE_USE_RFC_3542 #include #include // OpenBSD #include // TCP_FASTOPEN #include #include "libknot/libknot.h" #include "libknot/yparser/ypschema.h" #include "libknot/xdp.h" #if defined ENABLE_XDP && ENABLE_QUIC #include "libknot/xdp/quic.h" #endif // ENABLE_XDP && ENABLE_QUIC #include "knot/common/log.h" #include "knot/common/stats.h" #include "knot/common/systemd.h" #include "knot/common/unreachable.h" #include "knot/conf/confio.h" #include "knot/conf/migration.h" #include "knot/conf/module.h" #include "knot/dnssec/kasp/kasp_db.h" #include "knot/journal/journal_basic.h" #include "knot/server/server.h" #include "knot/server/udp-handler.h" #include "knot/server/tcp-handler.h" #include "knot/zone/timers.h" #include "knot/zone/zonedb-load.h" #include "knot/worker/pool.h" #include "contrib/conn_pool.h" #include "contrib/net.h" #include "contrib/openbsd/strlcat.h" #include "contrib/os.h" #include "contrib/sockaddr.h" #include "contrib/trim.h" #ifdef ENABLE_XDP #include #endif #ifdef SO_ATTACH_REUSEPORT_CBPF #include #endif /*! \brief Minimal send/receive buffer sizes. */ enum { UDP_MIN_RCVSIZE = 4096, UDP_MIN_SNDSIZE = 4096, TCP_MIN_RCVSIZE = 4096, TCP_MIN_SNDSIZE = sizeof(uint16_t) + UINT16_MAX }; /*! \brief Unbind interface and clear the structure. */ static void server_deinit_iface(iface_t *iface, bool dealloc) { assert(iface); /* Free UDP handler. */ if (iface->fd_udp != NULL) { for (int i = 0; i < iface->fd_udp_count; i++) { if (iface->fd_udp[i] > -1) { close(iface->fd_udp[i]); } } free(iface->fd_udp); } for (int i = 0; i < iface->fd_xdp_count; i++) { #ifdef ENABLE_XDP knot_xdp_deinit(iface->xdp_sockets[i]); #else assert(0); #endif } free(iface->fd_xdp); free(iface->xdp_sockets); /* Free TCP handler. */ if (iface->fd_tcp != NULL) { for (int i = 0; i < iface->fd_tcp_count; i++) { if (iface->fd_tcp[i] > -1) { close(iface->fd_tcp[i]); } } free(iface->fd_tcp); } if (dealloc) { free(iface); } } /*! \brief Deinit server interface list. */ static void server_deinit_iface_list(iface_t *ifaces, size_t n) { if (ifaces != NULL) { for (size_t i = 0; i < n; i++) { server_deinit_iface(ifaces + i, false); } free(ifaces); } } /*! * \brief Attach SO_REUSEPORT socket filter for perfect CPU locality. * * \param sock Socket where to attach the CBPF filter to. * \param sock_count Number of sockets. */ static bool server_attach_reuseport_bpf(const int sock, const int sock_count) { #ifdef SO_ATTACH_REUSEPORT_CBPF struct sock_filter code[] = { /* A = raw_smp_processor_id(). */ { BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU }, /* Adjust the CPUID to socket group size. */ { BPF_ALU | BPF_MOD | BPF_K, 0, 0, sock_count }, /* Return A. */ { BPF_RET | BPF_A, 0, 0, 0 }, }; struct sock_fprog prog = { 0 }; prog.len = sizeof(code) / sizeof(*code); prog.filter = code; return setsockopt(sock, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &prog, sizeof(prog)) == 0; #else return true; #endif } /*! \brief Set lower bound for socket option. */ static bool setsockopt_min(int sock, int option, int min) { int value = 0; socklen_t len = sizeof(value); if (getsockopt(sock, SOL_SOCKET, option, &value, &len) != 0) { return false; } assert(len == sizeof(value)); if (value >= min) { return true; } return setsockopt(sock, SOL_SOCKET, option, &min, sizeof(min)) == 0; } /*! * \brief Enlarge send/receive buffers. */ static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize) { return setsockopt_min(sock, SO_RCVBUF, min_recvsize) && setsockopt_min(sock, SO_SNDBUF, min_sndsize); } /*! * \brief Enable source packet information retrieval. */ static bool enable_pktinfo(int sock, int family) { int level = 0; int option = 0; switch (family) { case AF_INET: level = IPPROTO_IP; #if defined(IP_PKTINFO) option = IP_PKTINFO; /* Linux */ #elif defined(IP_RECVDSTADDR) option = IP_RECVDSTADDR; /* BSD */ #else return false; #endif break; case AF_INET6: level = IPPROTO_IPV6; option = IPV6_RECVPKTINFO; break; default: return false; } const int on = 1; return setsockopt(sock, level, option, &on, sizeof(on)) == 0; } /*! * Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets * ignore PMTU information and send packets with DF=0. * Fragmentation is allowed if and only if the packet * size exceeds the outgoing interface MTU or the packet * encounters smaller MTU link in network. * This mitigates DNS fragmentation attacks by preventing * forged PMTU information. * FreeBSD already has same semantics without setting * the option. */ static int disable_pmtudisc(int sock, int family) { #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT) if (family == AF_INET) { int action_omit = IP_PMTUDISC_OMIT; if (setsockopt(sock, IPPROTO_IP, IP_MTU_DISCOVER, &action_omit, sizeof(action_omit)) != 0) { return knot_map_errno(); } } #endif return KNOT_EOK; } static iface_t *server_init_xdp_iface(struct sockaddr_storage *addr, bool route_check, bool udp, bool tcp, uint16_t quic, unsigned *thread_id_start) { #ifndef ENABLE_XDP assert(0); return NULL; #else conf_xdp_iface_t iface; int ret = conf_xdp_iface(addr, &iface); if (ret != KNOT_EOK) { log_error("failed to initialize XDP interface (%s)", knot_strerror(ret)); return NULL; } iface_t *new_if = calloc(1, sizeof(*new_if)); if (new_if == NULL) { log_error("failed to initialize XDP interface"); return NULL; } memcpy(&new_if->addr, addr, sizeof(*addr)); new_if->fd_xdp = calloc(iface.queues, sizeof(int)); new_if->xdp_sockets = calloc(iface.queues, sizeof(*new_if->xdp_sockets)); if (new_if->fd_xdp == NULL || new_if->xdp_sockets == NULL) { log_error("failed to initialize XDP interface"); server_deinit_iface(new_if, true); return NULL; } new_if->xdp_first_thread_id = *thread_id_start; *thread_id_start += iface.queues; knot_xdp_filter_flag_t xdp_flags = udp ? KNOT_XDP_FILTER_UDP : 0; if (tcp) { xdp_flags |= KNOT_XDP_FILTER_TCP; } if (quic > 0) { xdp_flags |= KNOT_XDP_FILTER_QUIC; } if (route_check) { xdp_flags |= KNOT_XDP_FILTER_ROUTE; } for (int i = 0; i < iface.queues; i++) { knot_xdp_load_bpf_t mode = (i == 0 ? KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i, xdp_flags, iface.port, quic, mode, NULL); if (ret == -EBUSY && i == 0) { log_notice("XDP interface %s@%u is busy, retrying initialization", iface.name, iface.port); ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i, xdp_flags, iface.port, quic, KNOT_XDP_LOAD_BPF_ALWAYS_UNLOAD, NULL); } if (ret != KNOT_EOK) { log_warning("failed to initialize XDP interface %s@%u, queue %d (%s)", iface.name, iface.port, i, knot_strerror(ret)); server_deinit_iface(new_if, true); new_if = NULL; break; } new_if->fd_xdp[i] = knot_xdp_socket_fd(new_if->xdp_sockets[i]); new_if->fd_xdp_count++; } if (ret == KNOT_EOK) { char msg[128]; (void)snprintf(msg, sizeof(msg), "initialized XDP interface %s", iface.name); if (udp || tcp) { char buf[32] = ""; (void)snprintf(buf, sizeof(buf), ", %s%s%s port %u", (udp ? "UDP" : ""), (udp && tcp ? "/" : ""), (tcp ? "TCP" : ""), iface.port); strlcat(msg, buf, sizeof(msg)); } if (quic) { char buf[32] = ""; (void)snprintf(buf, sizeof(buf), ", QUIC port %u", quic); strlcat(msg, buf, sizeof(msg)); } knot_xdp_mode_t mode = knot_eth_xdp_mode(if_nametoindex(iface.name)); log_info("%s, queues %d, %s mode%s", msg, iface.queues, (mode == KNOT_XDP_MODE_FULL ? "native" : "emulated"), route_check ? ", route check" : ""); } return new_if; #endif } /*! * \brief Create and initialize new interface. * * Both TCP and UDP sockets will be created for the interface. * * \param addr Socket address. * \param udp_thread_count Number of created UDP workers. * \param tcp_thread_count Number of created TCP workers. * \param tcp_reuseport Indication if reuseport on TCP is enabled. * \param socket_affinity Indication if CBPF should be attached. * * \retval Pointer to a new initialized interface. * \retval NULL if error. */ static iface_t *server_init_iface(struct sockaddr_storage *addr, int udp_thread_count, int tcp_thread_count, bool tcp_reuseport, bool socket_affinity) { iface_t *new_if = calloc(1, sizeof(*new_if)); if (new_if == NULL) { log_error("failed to initialize interface"); return NULL; } memcpy(&new_if->addr, addr, sizeof(*addr)); /* Convert to string address format. */ char addr_str[SOCKADDR_STRLEN] = { 0 }; sockaddr_tostr(addr_str, sizeof(addr_str), addr); int udp_socket_count = 1; int udp_bind_flags = 0; int tcp_socket_count = 1; int tcp_bind_flags = 0; #ifdef ENABLE_REUSEPORT udp_socket_count = udp_thread_count; udp_bind_flags |= NET_BIND_MULTIPLE; if (tcp_reuseport) { tcp_socket_count = tcp_thread_count; tcp_bind_flags |= NET_BIND_MULTIPLE; } #endif new_if->fd_udp = malloc(udp_socket_count * sizeof(int)); new_if->fd_tcp = malloc(tcp_socket_count * sizeof(int)); if (new_if->fd_udp == NULL || new_if->fd_tcp == NULL) { log_error("failed to initialize interface"); server_deinit_iface(new_if, true); return NULL; } const mode_t unix_mode = S_IWUSR | S_IWGRP | S_IWOTH; bool warn_bind = true; bool warn_cbpf = true; bool warn_bufsize = true; bool warn_pktinfo = true; bool warn_flag_misc = true; /* Create bound UDP sockets. */ for (int i = 0; i < udp_socket_count; i++) { int sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode); if (sock == KNOT_EADDRNOTAVAIL) { udp_bind_flags |= NET_BIND_NONLOCAL; sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode); if (sock >= 0 && warn_bind) { log_warning("address %s UDP bound, but required nonlocal bind", addr_str); warn_bind = false; } } if (sock < 0) { log_error("cannot bind address %s UDP (%s)", addr_str, knot_strerror(sock)); server_deinit_iface(new_if, true); return NULL; } if ((udp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) { if (!server_attach_reuseport_bpf(sock, udp_socket_count) && warn_cbpf) { log_warning("cannot ensure optimal CPU locality for UDP"); warn_cbpf = false; } } if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE) && warn_bufsize) { log_warning("failed to set network buffer sizes for UDP"); warn_bufsize = false; } if (sockaddr_is_any(addr) && !enable_pktinfo(sock, addr->ss_family) && warn_pktinfo) { log_warning("failed to enable received packet information retrieval"); warn_pktinfo = false; } int ret = disable_pmtudisc(sock, addr->ss_family); if (ret != KNOT_EOK && warn_flag_misc) { log_warning("failed to disable Path MTU discovery for IPv4/UDP (%s)", knot_strerror(ret)); warn_flag_misc = false; } new_if->fd_udp[new_if->fd_udp_count] = sock; new_if->fd_udp_count += 1; } warn_bind = true; warn_cbpf = true; warn_bufsize = true; warn_flag_misc = true; /* Create bound TCP sockets. */ for (int i = 0; i < tcp_socket_count; i++) { int sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode); if (sock == KNOT_EADDRNOTAVAIL) { tcp_bind_flags |= NET_BIND_NONLOCAL; sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode); if (sock >= 0 && warn_bind) { log_warning("address %s TCP bound, but required nonlocal bind", addr_str); warn_bind = false; } } if (sock < 0) { log_error("cannot bind address %s TCP (%s)", addr_str, knot_strerror(sock)); server_deinit_iface(new_if, true); return NULL; } if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE) && warn_bufsize) { log_warning("failed to set network buffer sizes for TCP"); warn_bufsize = false; } new_if->fd_tcp[new_if->fd_tcp_count] = sock; new_if->fd_tcp_count += 1; /* Listen for incoming connections. */ int ret = listen(sock, TCP_BACKLOG_SIZE); if (ret < 0) { log_error("failed to listen on TCP interface %s", addr_str); server_deinit_iface(new_if, true); return NULL; } if ((tcp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) { if (!server_attach_reuseport_bpf(sock, tcp_socket_count) && warn_cbpf) { log_warning("cannot ensure optimal CPU locality for TCP"); warn_cbpf = false; } } /* Try to enable TCP Fast Open. */ ret = net_bound_tfo(sock, TCP_BACKLOG_SIZE); if (ret != KNOT_EOK && ret != KNOT_ENOTSUP && warn_flag_misc) { log_warning("failed to enable TCP Fast Open on %s (%s)", addr_str, knot_strerror(ret)); warn_flag_misc = false; } } return new_if; } static void log_sock_conf(conf_t *conf) { char buf[128] = ""; #if defined(ENABLE_REUSEPORT) strlcat(buf, "UDP", sizeof(buf)); if (conf->cache.srv_tcp_reuseport) { strlcat(buf, "/TCP", sizeof(buf)); } strlcat(buf, " reuseport", sizeof(buf)); if (conf->cache.srv_socket_affinity) { strlcat(buf, ", socket affinity", sizeof(buf)); } #endif #if defined(TCP_FASTOPEN) if (buf[0] != '\0') { strlcat(buf, ", ", sizeof(buf)); } strlcat(buf, "incoming", sizeof(buf)); if (conf->cache.srv_tcp_fastopen) { strlcat(buf, "/outgoing", sizeof(buf)); } strlcat(buf, " TCP Fast Open", sizeof(buf)); #endif if (buf[0] != '\0') { log_info("using %s", buf); } } /*! \brief Initialize bound sockets according to configuration. */ static int configure_sockets(conf_t *conf, server_t *s) { if (s->state & ServerRunning) { return KNOT_EOK; } conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN); conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN); conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR); if (listen_val.code == KNOT_EOK) { log_sock_conf(conf); } else if (lisxdp_val.code != KNOT_EOK) { log_warning("no network interface configured"); return KNOT_EOK; } #ifdef ENABLE_XDP if (lisxdp_val.code == KNOT_EOK && !linux_at_least(5, 11)) { struct rlimit min_limit = { RLIM_INFINITY, RLIM_INFINITY }; struct rlimit cur_limit = { 0 }; if (getrlimit(RLIMIT_MEMLOCK, &cur_limit) != 0 || cur_limit.rlim_cur != min_limit.rlim_cur || cur_limit.rlim_max != min_limit.rlim_max) { int ret = setrlimit(RLIMIT_MEMLOCK, &min_limit); if (ret != 0) { log_error("failed to increase RLIMIT_MEMLOCK (%s)", knot_strerror(errno)); return KNOT_ESYSTEM; } } } #endif size_t real_nifs = 0; size_t nifs = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val); iface_t *newlist = calloc(nifs, sizeof(*newlist)); if (newlist == NULL) { log_error("failed to allocate memory for network sockets"); return KNOT_ENOMEM; } /* Normal UDP and TCP sockets. */ unsigned size_udp = s->handlers[IO_UDP].handler.unit->size; unsigned size_tcp = s->handlers[IO_TCP].handler.unit->size; bool tcp_reuseport = conf->cache.srv_tcp_reuseport; bool socket_affinity = conf->cache.srv_socket_affinity; char *rundir = conf_abs_path(&rundir_val, NULL); while (listen_val.code == KNOT_EOK) { struct sockaddr_storage addr = conf_addr(&listen_val, rundir); char addr_str[SOCKADDR_STRLEN] = { 0 }; sockaddr_tostr(addr_str, sizeof(addr_str), &addr); log_info("binding to interface %s", addr_str); iface_t *new_if = server_init_iface(&addr, size_udp, size_tcp, tcp_reuseport, socket_affinity); if (new_if == NULL) { server_deinit_iface_list(newlist, nifs); free(rundir); return KNOT_ERROR; } memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist)); free(new_if); conf_val_next(&listen_val); } free(rundir); /* XDP sockets. */ bool xdp_udp = conf->cache.xdp_udp; bool xdp_tcp = conf->cache.xdp_tcp; uint16_t xdp_quic = conf->cache.xdp_quic; bool route_check = conf->cache.xdp_route_check; unsigned thread_id = s->handlers[IO_UDP].handler.unit->size + s->handlers[IO_TCP].handler.unit->size; while (lisxdp_val.code == KNOT_EOK) { struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL); char addr_str[SOCKADDR_STRLEN] = { 0 }; sockaddr_tostr(addr_str, sizeof(addr_str), &addr); log_info("binding to XDP interface %s", addr_str); iface_t *new_if = server_init_xdp_iface(&addr, route_check, xdp_udp, xdp_tcp, xdp_quic, &thread_id); if (new_if == NULL) { server_deinit_iface_list(newlist, nifs); return KNOT_ERROR; } memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist)); free(new_if); conf_val_next(&lisxdp_val); } assert(real_nifs <= nifs); nifs = real_nifs; #if defined ENABLE_XDP && ENABLE_QUIC if (xdp_quic > 0) { char *tls_cert = conf_tls(conf, C_CERT_FILE); char *tls_key = conf_tls(conf, C_KEY_FILE); if (tls_cert == NULL) { log_notice("QUIC, no server certificate configured, using one-time one"); } s->quic_creds = knot_xquic_init_creds(true, tls_cert, tls_key); free(tls_cert); free(tls_key); if (s->quic_creds == NULL) { log_error("QUIC, failed to initialize server credentials"); server_deinit_iface_list(newlist, nifs); return KNOT_ERROR; } } #endif // ENABLE_XDP && ENABLE_QUIC /* Publish new list. */ s->ifaces = newlist; s->n_ifaces = nifs; /* Assign thread identifiers unique per all handlers. */ unsigned thread_count = 0; for (unsigned proto = IO_UDP; proto <= IO_XDP; ++proto) { dt_unit_t *tu = s->handlers[proto].handler.unit; for (unsigned i = 0; tu != NULL && i < tu->size; ++i) { s->handlers[proto].handler.thread_id[i] = thread_count++; } } return KNOT_EOK; } int server_init(server_t *server, int bg_workers) { if (server == NULL) { return KNOT_EINVAL; } /* Clear the structure. */ memset(server, 0, sizeof(server_t)); /* Initialize event scheduler. */ if (evsched_init(&server->sched, server) != KNOT_EOK) { return KNOT_ENOMEM; } server->workers = worker_pool_create(bg_workers); if (server->workers == NULL) { evsched_deinit(&server->sched); return KNOT_ENOMEM; } int ret = catalog_update_init(&server->catalog_upd); if (ret != KNOT_EOK) { worker_pool_destroy(server->workers); evsched_deinit(&server->sched); return ret; } zone_backups_init(&server->backup_ctxs); char *catalog_dir = conf_db(conf(), C_CATALOG_DB); conf_val_t catalog_size = conf_db_param(conf(), C_CATALOG_DB_MAX_SIZE); catalog_init(&server->catalog, catalog_dir, conf_int(&catalog_size)); free(catalog_dir); conf()->catalog = &server->catalog; char *journal_dir = conf_db(conf(), C_JOURNAL_DB); conf_val_t journal_size = conf_db_param(conf(), C_JOURNAL_DB_MAX_SIZE); conf_val_t journal_mode = conf_db_param(conf(), C_JOURNAL_DB_MODE); knot_lmdb_init(&server->journaldb, journal_dir, conf_int(&journal_size), journal_env_flags(conf_opt(&journal_mode), false), NULL); free(journal_dir); kasp_db_ensure_init(&server->kaspdb, conf()); char *timer_dir = conf_db(conf(), C_TIMER_DB); conf_val_t timer_size = conf_db_param(conf(), C_TIMER_DB_MAX_SIZE); knot_lmdb_init(&server->timerdb, timer_dir, conf_int(&timer_size), 0, NULL); free(timer_dir); return KNOT_EOK; } void server_deinit(server_t *server) { if (server == NULL) { return; } zone_backups_deinit(&server->backup_ctxs); /* Save zone timers. */ if (server->zone_db != NULL) { log_info("updating persistent timer DB"); int ret = zone_timers_write_all(&server->timerdb, server->zone_db); if (ret != KNOT_EOK) { log_warning("failed to update persistent timer DB (%s)", knot_strerror(ret)); } } /* Free remaining interfaces. */ server_deinit_iface_list(server->ifaces, server->n_ifaces); /* Free threads and event handlers. */ worker_pool_destroy(server->workers); /* Free zone database. */ knot_zonedb_deep_free(&server->zone_db, true); /* Free remaining events. */ evsched_deinit(&server->sched); /* Free catalog zone context. */ catalog_update_clear(&server->catalog_upd); catalog_update_deinit(&server->catalog_upd); catalog_deinit(&server->catalog); /* Close persistent timers DB. */ knot_lmdb_deinit(&server->timerdb); /* Close kasp_db. */ knot_lmdb_deinit(&server->kaspdb); /* Close journal database if open. */ knot_lmdb_deinit(&server->journaldb); /* Close and deinit connection pool. */ conn_pool_deinit(global_conn_pool); global_conn_pool = NULL; knot_unreachables_deinit(&global_unreachables); #if defined ENABLE_XDP && ENABLE_QUIC knot_xquic_free_creds(server->quic_creds); #endif // ENABLE_XDP && ENABLE_QUIC } static int server_init_handler(server_t *server, int index, int thread_count, runnable_t runnable, runnable_t destructor) { /* Initialize */ iohandler_t *h = &server->handlers[index].handler; memset(h, 0, sizeof(iohandler_t)); h->server = server; h->unit = dt_create(thread_count, runnable, destructor, h); if (h->unit == NULL) { return KNOT_ENOMEM; } h->thread_state = calloc(thread_count, sizeof(unsigned)); if (h->thread_state == NULL) { dt_delete(&h->unit); return KNOT_ENOMEM; } h->thread_id = calloc(thread_count, sizeof(unsigned)); if (h->thread_id == NULL) { free(h->thread_state); dt_delete(&h->unit); return KNOT_ENOMEM; } return KNOT_EOK; } static void server_free_handler(iohandler_t *h) { if (h == NULL || h->server == NULL) { return; } /* Wait for threads to finish */ if (h->unit) { dt_stop(h->unit); dt_join(h->unit); } /* Destroy worker context. */ dt_delete(&h->unit); free(h->thread_state); free(h->thread_id); } static void worker_wait_cb(worker_pool_t *pool) { systemd_zone_load_timeout_notify(); static uint64_t last_ns = 0; struct timespec now = time_now(); uint64_t now_ns = 1000000000 * now.tv_sec + now.tv_nsec; /* Too frequent worker_pool_status() call with many zones is expensive. */ if (now_ns - last_ns > 1000000000) { int running, queued; worker_pool_status(pool, true, &running, &queued); systemd_tasks_status_notify(running + queued); last_ns = now_ns; } } int server_start(server_t *server, bool async) { if (server == NULL) { return KNOT_EINVAL; } /* Start workers. */ worker_pool_start(server->workers); /* Wait for enqueued events if not asynchronous. */ if (!async) { worker_pool_wait_cb(server->workers, worker_wait_cb); systemd_tasks_status_notify(0); } /* Start evsched handler. */ evsched_start(&server->sched); /* Start I/O handlers. */ server->state |= ServerRunning; for (int proto = IO_UDP; proto <= IO_XDP; ++proto) { if (server->handlers[proto].size > 0) { int ret = dt_start(server->handlers[proto].handler.unit); if (ret != KNOT_EOK) { return ret; } } } return KNOT_EOK; } void server_wait(server_t *server) { if (server == NULL) { return; } evsched_join(&server->sched); worker_pool_join(server->workers); for (int proto = IO_UDP; proto <= IO_XDP; ++proto) { if (server->handlers[proto].size > 0) { server_free_handler(&server->handlers[proto].handler); } } } static int reload_conf(conf_t *new_conf) { yp_schema_purge_dynamic(new_conf->schema); /* Re-load common modules. */ int ret = conf_mod_load_common(new_conf); if (ret != KNOT_EOK) { return ret; } /* Re-import config file if specified. */ const char *filename = conf()->filename; if (filename != NULL) { log_info("reloading configuration file '%s'", filename); /* Import the configuration file. */ ret = conf_import(new_conf, filename, true, false); if (ret != KNOT_EOK) { log_error("failed to load configuration file (%s)", knot_strerror(ret)); return ret; } } else { log_info("reloading configuration database '%s'", knot_db_lmdb_get_path(new_conf->db)); /* Re-load extra modules. */ for (conf_iter_t iter = conf_iter(new_conf, C_MODULE); iter.code == KNOT_EOK; conf_iter_next(new_conf, &iter)) { conf_val_t id = conf_iter_id(new_conf, &iter); conf_val_t file = conf_id_get(new_conf, C_MODULE, C_FILE, &id); ret = conf_mod_load_extra(new_conf, conf_str(&id), conf_str(&file), MOD_EXPLICIT); if (ret != KNOT_EOK) { conf_iter_finish(new_conf, &iter); return ret; } } } conf_mod_load_purge(new_conf, false); // Migrate from old schema. ret = conf_migrate(new_conf); if (ret != KNOT_EOK) { log_error("failed to migrate configuration (%s)", knot_strerror(ret)); } return KNOT_EOK; } /*! \brief Check if parameter listen(-xdp) has been changed since knotd started. */ static bool listen_changed(conf_t *conf, server_t *server) { assert(server->ifaces); conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN); conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN); size_t new_count = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val); size_t old_count = server->n_ifaces; if (new_count != old_count) { return true; } conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR); char *rundir = conf_abs_path(&rundir_val, NULL); size_t matches = 0; /* Find matching interfaces. */ while (listen_val.code == KNOT_EOK) { struct sockaddr_storage addr = conf_addr(&listen_val, rundir); bool found = false; for (size_t i = 0; i < server->n_ifaces; i++) { if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) { matches++; found = true; break; } } if (!found) { break; } conf_val_next(&listen_val); } free(rundir); while (lisxdp_val.code == KNOT_EOK) { struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL); bool found = false; for (size_t i = 0; i < server->n_ifaces; i++) { if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) { matches++; found = true; break; } } if (!found) { break; } conf_val_next(&lisxdp_val); } return matches != old_count; } /*! \brief Log warnings if config change requires a restart. */ static void warn_server_reconfigure(conf_t *conf, server_t *server) { const char *msg = "changes of %s require restart to take effect"; static bool warn_tcp_reuseport = true; static bool warn_socket_affinity = true; static bool warn_udp = true; static bool warn_tcp = true; static bool warn_bg = true; static bool warn_listen = true; static bool warn_xdp_udp = true; static bool warn_xdp_tcp = true; static bool warn_xdp_quic = true; static bool warn_route_check = true; static bool warn_rmt_pool_limit = true; if (warn_tcp_reuseport && conf->cache.srv_tcp_reuseport != conf_get_bool(conf, C_SRV, C_TCP_REUSEPORT)) { log_warning(msg, &C_TCP_REUSEPORT[1]); warn_tcp_reuseport = false; } if (warn_socket_affinity && conf->cache.srv_socket_affinity != conf_get_bool(conf, C_SRV, C_SOCKET_AFFINITY)) { log_warning(msg, &C_SOCKET_AFFINITY[1]); warn_socket_affinity = false; } if (warn_udp && server->handlers[IO_UDP].size != conf_udp_threads(conf)) { log_warning(msg, &C_UDP_WORKERS[1]); warn_udp = false; } if (warn_tcp && server->handlers[IO_TCP].size != conf_tcp_threads(conf)) { log_warning(msg, &C_TCP_WORKERS[1]); warn_tcp = false; } if (warn_bg && conf->cache.srv_bg_threads != conf_bg_threads(conf)) { log_warning(msg, &C_BG_WORKERS[1]); warn_bg = false; } if (warn_listen && server->ifaces != NULL && listen_changed(conf, server)) { log_warning(msg, "listen(-xdp)"); warn_listen = false; } if (warn_xdp_udp && conf->cache.xdp_udp != conf_get_bool(conf, C_XDP, C_UDP)) { log_warning(msg, &C_UDP[1]); warn_xdp_udp = false; } if (warn_xdp_tcp && conf->cache.xdp_tcp != conf_get_bool(conf, C_XDP, C_TCP)) { log_warning(msg, &C_TCP[1]); warn_xdp_tcp = false; } if (warn_xdp_quic && (bool)conf->cache.xdp_quic != conf_get_bool(conf, C_XDP, C_QUIC)) { log_warning(msg, &C_QUIC[1]); warn_xdp_quic = false; } if (warn_xdp_quic && conf->cache.xdp_quic > 0 && conf->cache.xdp_quic != conf_get_int(conf, C_XDP, C_QUIC_PORT)) { log_warning(msg, &C_QUIC_PORT[1]); warn_xdp_quic = false; } if (warn_route_check && conf->cache.xdp_route_check != conf_get_bool(conf, C_XDP, C_ROUTE_CHECK)) { log_warning(msg, &C_ROUTE_CHECK[1]); warn_route_check = false; } if (warn_rmt_pool_limit && global_conn_pool != NULL && global_conn_pool->capacity != conf_get_int(conf, C_SRV, C_RMT_POOL_LIMIT)) { log_warning(msg, &C_RMT_POOL_LIMIT[1]); warn_rmt_pool_limit = false; } } int server_reload(server_t *server, reload_t mode) { if (server == NULL) { return KNOT_EINVAL; } systemd_reloading_notify(); /* Check for no edit mode. */ if (conf()->io.txn != NULL) { log_warning("reload aborted due to active configuration transaction"); systemd_ready_notify(); return KNOT_TXN_EEXISTS; } conf_t *new_conf = NULL; int ret = conf_clone(&new_conf); if (ret != KNOT_EOK) { log_error("failed to initialize configuration (%s)", knot_strerror(ret)); systemd_ready_notify(); return ret; } yp_flag_t flags = conf()->io.flags; bool full = !(flags & CONF_IO_FACTIVE); bool reuse_modules = !full && !(flags & CONF_IO_FRLD_MOD); /* Reload configuration and modules if full reload or a module change. */ if (full || !reuse_modules) { ret = reload_conf(new_conf); if (ret != KNOT_EOK) { conf_free(new_conf); systemd_ready_notify(); return ret; } conf_activate_modules(new_conf, server, NULL, new_conf->query_modules, &new_conf->query_plan); } conf_update_flag_t upd_flags = CONF_UPD_FNOFREE; if (!full) { upd_flags |= CONF_UPD_FCONFIO; } if (reuse_modules) { upd_flags |= CONF_UPD_FMODULES; } /* Update to the new config. */ conf_t *old_conf = conf_update(new_conf, upd_flags); /* Reload each component if full reload or a specific one if required. */ if (full || (flags & CONF_IO_FRLD_LOG)) { log_reconfigure(conf()); } if (full || (flags & CONF_IO_FRLD_SRV)) { (void)server_reconfigure(conf(), server); warn_server_reconfigure(conf(), server); stats_reconfigure(conf(), server); } if (full || (flags & (CONF_IO_FRLD_ZONES | CONF_IO_FRLD_ZONE))) { server_update_zones(conf(), server, mode); } /* Free old config needed for module unload in zone reload. */ conf_free(old_conf); if (full) { log_info("configuration reloaded"); } else { // Reset confio reload context. conf()->io.flags = YP_FNONE; if (conf()->io.zones != NULL) { trie_clear(conf()->io.zones); } } systemd_ready_notify(); return KNOT_EOK; } void server_stop(server_t *server) { log_info("stopping server"); systemd_stopping_notify(); /* Stop scheduler. */ evsched_stop(&server->sched); /* Interrupt background workers. */ worker_pool_stop(server->workers); /* Clear 'running' flag. */ server->state &= ~ServerRunning; } static int set_handler(server_t *server, int index, unsigned size, runnable_t run) { /* Initialize I/O handlers. */ int ret = server_init_handler(server, index, size, run, NULL); if (ret != KNOT_EOK) { return ret; } server->handlers[index].size = size; return KNOT_EOK; } static int configure_threads(conf_t *conf, server_t *server) { int ret = set_handler(server, IO_UDP, conf->cache.srv_udp_threads, udp_master); if (ret != KNOT_EOK) { return ret; } if (conf->cache.srv_xdp_threads > 0) { ret = set_handler(server, IO_XDP, conf->cache.srv_xdp_threads, udp_master); if (ret != KNOT_EOK) { return ret; } } return set_handler(server, IO_TCP, conf->cache.srv_tcp_threads, tcp_master); } static int reconfigure_journal_db(conf_t *conf, server_t *server) { char *journal_dir = conf_db(conf, C_JOURNAL_DB); conf_val_t journal_size = conf_db_param(conf, C_JOURNAL_DB_MAX_SIZE); conf_val_t journal_mode = conf_db_param(conf, C_JOURNAL_DB_MODE); int ret = knot_lmdb_reinit(&server->journaldb, journal_dir, conf_int(&journal_size), journal_env_flags(conf_opt(&journal_mode), false)); if (ret != KNOT_EOK) { log_warning("ignored reconfiguration of journal DB (%s)", knot_strerror(ret)); } free(journal_dir); return KNOT_EOK; // not "ret" } static int reconfigure_kasp_db(conf_t *conf, server_t *server) { char *kasp_dir = conf_db(conf, C_KASP_DB); conf_val_t kasp_size = conf_db_param(conf, C_KASP_DB_MAX_SIZE); int ret = knot_lmdb_reinit(&server->kaspdb, kasp_dir, conf_int(&kasp_size), 0); if (ret != KNOT_EOK) { log_warning("ignored reconfiguration of KASP DB (%s)", knot_strerror(ret)); } free(kasp_dir); return KNOT_EOK; // not "ret" } static int reconfigure_timer_db(conf_t *conf, server_t *server) { char *timer_dir = conf_db(conf, C_TIMER_DB); conf_val_t timer_size = conf_db_param(conf, C_TIMER_DB_MAX_SIZE); int ret = knot_lmdb_reconfigure(&server->timerdb, timer_dir, conf_int(&timer_size), 0); free(timer_dir); return ret; } static int reconfigure_remote_pool(conf_t *conf) { conf_val_t val = conf_get(conf, C_SRV, C_RMT_POOL_LIMIT); size_t limit = conf_int(&val); val = conf_get(conf, C_SRV, C_RMT_POOL_TIMEOUT); knot_timediff_t timeout = conf_int(&val); if (global_conn_pool == NULL && limit > 0) { conn_pool_t *new_pool = conn_pool_init(limit, timeout); if (new_pool == NULL) { return KNOT_ENOMEM; } global_conn_pool = new_pool; } else { (void)conn_pool_timeout(global_conn_pool, timeout); } val = conf_get(conf, C_SRV, C_RMT_RETRY_DELAY); int delay_ms = conf_int(&val); if (global_unreachables == NULL && delay_ms > 0) { global_unreachables = knot_unreachables_init(delay_ms); } else { (void)knot_unreachables_ttl(global_unreachables, delay_ms); } return KNOT_EOK; } int server_reconfigure(conf_t *conf, server_t *server) { if (conf == NULL || server == NULL) { return KNOT_EINVAL; } int ret; /* First reconfiguration. */ if (!(server->state & ServerRunning)) { log_info("Knot DNS %s starting", PACKAGE_VERSION); size_t mapsize = conf->mapsize / (1024 * 1024); if (conf->filename != NULL) { log_info("loaded configuration file '%s', mapsize %zu MiB", conf->filename, mapsize); } else { log_info("loaded configuration database '%s', mapsize %zu MiB", knot_db_lmdb_get_path(conf->db), mapsize); } /* Configure server threads. */ if ((ret = configure_threads(conf, server)) != KNOT_EOK) { log_error("failed to configure server threads (%s)", knot_strerror(ret)); return ret; } /* Configure sockets. */ if ((ret = configure_sockets(conf, server)) != KNOT_EOK) { return ret; } if (conf_lmdb_readers(conf) > CONF_MAX_DB_READERS) { log_warning("config, exceeded number of database readers"); } } /* Reconfigure journal DB. */ if ((ret = reconfigure_journal_db(conf, server)) != KNOT_EOK) { log_error("failed to reconfigure journal DB (%s)", knot_strerror(ret)); } /* Reconfigure KASP DB. */ if ((ret = reconfigure_kasp_db(conf, server)) != KNOT_EOK) { log_error("failed to reconfigure KASP DB (%s)", knot_strerror(ret)); } /* Reconfigure Timer DB. */ if ((ret = reconfigure_timer_db(conf, server)) != KNOT_EOK) { log_error("failed to reconfigure Timer DB (%s)", knot_strerror(ret)); } /* Reconfigure connection pool. */ if ((ret = reconfigure_remote_pool(conf)) != KNOT_EOK) { log_error("failed to reconfigure remote pool (%s)", knot_strerror(ret)); } return KNOT_EOK; } void server_update_zones(conf_t *conf, server_t *server, reload_t mode) { if (conf == NULL || server == NULL) { return; } /* Prevent emitting of new zone events. */ if (server->zone_db) { knot_zonedb_foreach(server->zone_db, zone_events_freeze); } /* Suspend adding events to worker pool queue, wait for queued events. */ evsched_pause(&server->sched); worker_pool_wait(server->workers); /* Reload zone database and free old zones. */ zonedb_reload(conf, server, mode); /* Trim extra heap. */ mem_trim(); /* Resume processing events on new zones. */ evsched_resume(&server->sched); if (server->zone_db) { knot_zonedb_foreach(server->zone_db, zone_events_start); } }