diff options
Diffstat (limited to 'src/knot/server')
-rw-r--r-- | src/knot/server/dthreads.c | 767 | ||||
-rw-r--r-- | src/knot/server/dthreads.h | 295 | ||||
-rw-r--r-- | src/knot/server/proxyv2.c | 69 | ||||
-rw-r--r-- | src/knot/server/proxyv2.h | 23 | ||||
-rw-r--r-- | src/knot/server/server.c | 1335 | ||||
-rw-r--r-- | src/knot/server/server.h | 203 | ||||
-rw-r--r-- | src/knot/server/tcp-handler.c | 380 | ||||
-rw-r--r-- | src/knot/server/tcp-handler.h | 43 | ||||
-rw-r--r-- | src/knot/server/udp-handler.c | 575 | ||||
-rw-r--r-- | src/knot/server/udp-handler.h | 43 | ||||
-rw-r--r-- | src/knot/server/xdp-handler.c | 506 | ||||
-rw-r--r-- | src/knot/server/xdp-handler.h | 67 |
12 files changed, 4306 insertions, 0 deletions
diff --git a/src/knot/server/dthreads.c b/src/knot/server/dthreads.c new file mode 100644 index 0000000..74203ac --- /dev/null +++ b/src/knot/server/dthreads.c @@ -0,0 +1,767 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <urcu.h> + +#ifdef HAVE_PTHREAD_NP_H +#include <pthread_np.h> +#endif /* HAVE_PTHREAD_NP_H */ + +#include "knot/server/dthreads.h" +#include "libknot/libknot.h" + +/* BSD cpu set compatibility. */ +#if defined(HAVE_CPUSET_BSD) +typedef cpuset_t cpu_set_t; +#endif + +/*! \brief Lock thread state for R/W. */ +static inline void lock_thread_rw(dthread_t *thread) +{ + pthread_mutex_lock(&thread->_mx); +} +/*! \brief Unlock thread state for R/W. */ +static inline void unlock_thread_rw(dthread_t *thread) +{ + pthread_mutex_unlock(&thread->_mx); +} + +/*! \brief Signalize thread state change. */ +static inline void unit_signalize_change(dt_unit_t *unit) +{ + pthread_mutex_lock(&unit->_report_mx); + pthread_cond_signal(&unit->_report); + pthread_mutex_unlock(&unit->_report_mx); +} + +/*! + * \brief Update thread state with notification. + * \param thread Given thread. + * \param state New state for thread. + * \retval 0 on success. + * \retval <0 on error (EINVAL, ENOTSUP). + */ +static inline int dt_update_thread(dthread_t *thread, int state) +{ + if (thread == 0) { + return KNOT_EINVAL; + } + + // Cancel with lone thread + dt_unit_t *unit = thread->unit; + if (unit == 0) { + return KNOT_ENOTSUP; + } + + // Cancel current runnable if running + pthread_mutex_lock(&unit->_notify_mx); + lock_thread_rw(thread); + if (thread->state & (ThreadIdle | ThreadActive)) { + + // Update state + thread->state = state; + unlock_thread_rw(thread); + + // Notify thread + pthread_cond_broadcast(&unit->_notify); + pthread_mutex_unlock(&unit->_notify_mx); + } else { + /* Unable to update thread, it is already dead. */ + unlock_thread_rw(thread); + pthread_mutex_unlock(&unit->_notify_mx); + return KNOT_EINVAL; + } + + return KNOT_EOK; +} + +/*! + * \brief Thread entrypoint function. + * + * When a thread is created and started, it immediately enters this function. + * Depending on thread state, it either enters runnable or + * blocks until it is awakened. + * + * This function also handles "ThreadIdle" state to quickly suspend and resume + * threads and mitigate thread creation costs. Also, thread runnable may + * be changed to alter the thread behavior on runtime + */ +static void *thread_ep(void *data) +{ + dthread_t *thread = (dthread_t *)data; + if (thread == 0) { + return 0; + } + + // Check if is a member of unit + dt_unit_t *unit = thread->unit; + if (unit == 0) { + return 0; + } + + // Unblock SIGALRM for synchronization + sigset_t mask; + (void)sigemptyset(&mask); + sigaddset(&mask, SIGALRM); + pthread_sigmask(SIG_UNBLOCK, &mask, NULL); + + rcu_register_thread(); + + // Run loop + for (;;) { + + // Check thread state + lock_thread_rw(thread); + if (thread->state == ThreadDead) { + unlock_thread_rw(thread); + break; + } + + // Update data + thread->data = thread->_adata; + runnable_t _run = thread->run; + + // Start runnable if thread is marked Active + if ((thread->state == ThreadActive) && (thread->run != 0)) { + unlock_thread_rw(thread); + _run(thread); + } else { + unlock_thread_rw(thread); + } + + // If the runnable was cancelled, start new iteration + lock_thread_rw(thread); + if (thread->state & ThreadCancelled) { + thread->state &= ~ThreadCancelled; + unlock_thread_rw(thread); + continue; + } + unlock_thread_rw(thread); + + // Runnable finished without interruption, mark as Idle + pthread_mutex_lock(&unit->_notify_mx); + lock_thread_rw(thread); + if (thread->state & ThreadActive) { + thread->state &= ~ThreadActive; + thread->state |= ThreadIdle; + } + + // Go to sleep if idle + if (thread->state & ThreadIdle) { + unlock_thread_rw(thread); + + // Signalize state change + unit_signalize_change(unit); + + // Wait for notification from unit + pthread_cond_wait(&unit->_notify, &unit->_notify_mx); + pthread_mutex_unlock(&unit->_notify_mx); + } else { + unlock_thread_rw(thread); + pthread_mutex_unlock(&unit->_notify_mx); + } + } + + // Thread destructor + if (thread->destruct) { + thread->destruct(thread); + } + + // Report thread state change + unit_signalize_change(unit); + lock_thread_rw(thread); + thread->state |= ThreadJoinable; + unlock_thread_rw(thread); + rcu_unregister_thread(); + + // Return + return 0; +} + +/*! + * \brief Create single thread. + * \retval New thread instance on success. + * \retval NULL on error. + */ +static dthread_t *dt_create_thread(dt_unit_t *unit) +{ + // Alloc thread + dthread_t *thread = malloc(sizeof(dthread_t)); + if (thread == 0) { + return 0; + } + + memset(thread, 0, sizeof(dthread_t)); + + // Blank thread state + thread->state = ThreadJoined; + pthread_mutex_init(&thread->_mx, 0); + + // Set membership in unit + thread->unit = unit; + + // Initialize attribute + pthread_attr_t *attr = &thread->_attr; + pthread_attr_init(attr); + //pthread_attr_setinheritsched(attr, PTHREAD_INHERIT_SCHED); + //pthread_attr_setschedpolicy(attr, SCHED_OTHER); + pthread_attr_setstacksize(attr, 1024*1024); + return thread; +} + +/*! \brief Delete single thread. */ +static void dt_delete_thread(dthread_t **thread) +{ + if (!thread || !*thread) { + return; + } + + dthread_t* thr = *thread; + thr->unit = 0; + *thread = 0; + + // Delete attribute + pthread_attr_destroy(&(thr)->_attr); + + // Delete mutex + pthread_mutex_destroy(&(thr)->_mx); + + // Free memory + free(thr); +} + +static dt_unit_t *dt_create_unit(int count) +{ + if (count <= 0) { + return 0; + } + + dt_unit_t *unit = malloc(sizeof(dt_unit_t)); + if (unit == 0) { + return 0; + } + + // Initialize conditions + if (pthread_cond_init(&unit->_notify, 0) != 0) { + free(unit); + return 0; + } + if (pthread_cond_init(&unit->_report, 0) != 0) { + pthread_cond_destroy(&unit->_notify); + free(unit); + return 0; + } + + // Initialize mutexes + if (pthread_mutex_init(&unit->_notify_mx, 0) != 0) { + pthread_cond_destroy(&unit->_notify); + pthread_cond_destroy(&unit->_report); + free(unit); + return 0; + } + if (pthread_mutex_init(&unit->_report_mx, 0) != 0) { + pthread_cond_destroy(&unit->_notify); + pthread_cond_destroy(&unit->_report); + pthread_mutex_destroy(&unit->_notify_mx); + free(unit); + return 0; + } + if (pthread_mutex_init(&unit->_mx, 0) != 0) { + pthread_cond_destroy(&unit->_notify); + pthread_cond_destroy(&unit->_report); + pthread_mutex_destroy(&unit->_notify_mx); + pthread_mutex_destroy(&unit->_report_mx); + free(unit); + return 0; + } + + // Save unit size + unit->size = count; + + // Alloc threads + unit->threads = calloc(count, sizeof(dthread_t *)); + if (unit->threads == 0) { + pthread_cond_destroy(&unit->_notify); + pthread_cond_destroy(&unit->_report); + pthread_mutex_destroy(&unit->_notify_mx); + pthread_mutex_destroy(&unit->_report_mx); + pthread_mutex_destroy(&unit->_mx); + free(unit); + return 0; + } + + // Initialize threads + int init_success = 1; + for (int i = 0; i < count; ++i) { + unit->threads[i] = dt_create_thread(unit); + if (unit->threads[i] == 0) { + init_success = 0; + break; + } + } + + // Check thread initialization + if (!init_success) { + + // Delete created threads + for (int i = 0; i < count; ++i) { + dt_delete_thread(&unit->threads[i]); + } + + // Free rest of the unit + pthread_cond_destroy(&unit->_notify); + pthread_cond_destroy(&unit->_report); + pthread_mutex_destroy(&unit->_notify_mx); + pthread_mutex_destroy(&unit->_report_mx); + pthread_mutex_destroy(&unit->_mx); + free(unit->threads); + free(unit); + return 0; + } + + return unit; +} + +dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data) +{ + if (count <= 0) { + return 0; + } + + // Create unit + dt_unit_t *unit = dt_create_unit(count); + if (unit == 0) { + return 0; + } + + // Set threads common purpose + pthread_mutex_lock(&unit->_notify_mx); + dt_unit_lock(unit); + + for (int i = 0; i < count; ++i) { + dthread_t *thread = unit->threads[i]; + lock_thread_rw(thread); + thread->run = runnable; + thread->destruct = destructor; + thread->_adata = data; + unlock_thread_rw(thread); + } + + dt_unit_unlock(unit); + pthread_mutex_unlock(&unit->_notify_mx); + + return unit; +} + +void dt_delete(dt_unit_t **unit) +{ + /* + * All threads must be stopped or idle at this point, + * or else the behavior is undefined. + * Sorry. + */ + + if (unit == 0) { + return; + } + if (*unit == 0) { + return; + } + + // Compact and reclaim idle threads + dt_unit_t *d_unit = *unit; + dt_compact(d_unit); + + // Delete threads + for (int i = 0; i < d_unit->size; ++i) { + dt_delete_thread(&d_unit->threads[i]); + } + + // Deinit mutexes + pthread_mutex_destroy(&d_unit->_notify_mx); + pthread_mutex_destroy(&d_unit->_report_mx); + pthread_mutex_destroy(&d_unit->_mx); + + // Deinit conditions + pthread_cond_destroy(&d_unit->_notify); + pthread_cond_destroy(&d_unit->_report); + + // Free memory + free(d_unit->threads); + free(d_unit); + *unit = 0; +} + +static int dt_start_id(dthread_t *thread) +{ + if (thread == 0) { + return KNOT_EINVAL; + } + + lock_thread_rw(thread); + + // Update state + int prev_state = thread->state; + thread->state |= ThreadActive; + thread->state &= ~ThreadIdle; + thread->state &= ~ThreadDead; + thread->state &= ~ThreadJoined; + thread->state &= ~ThreadJoinable; + + // Do not re-create running threads + if (prev_state != ThreadJoined) { + unlock_thread_rw(thread); + return 0; + } + + // Start thread + sigset_t mask_all, mask_old; + sigfillset(&mask_all); + sigdelset(&mask_all, SIGPROF); + pthread_sigmask(SIG_SETMASK, &mask_all, &mask_old); + int res = pthread_create(&thread->_thr, /* pthread_t */ + &thread->_attr, /* pthread_attr_t */ + thread_ep, /* routine: thread_ep */ + thread); /* passed object: dthread_t */ + pthread_sigmask(SIG_SETMASK, &mask_old, NULL); + + // Unlock thread + unlock_thread_rw(thread); + return res; +} + +int dt_start(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + // Lock unit + pthread_mutex_lock(&unit->_notify_mx); + dt_unit_lock(unit); + for (int i = 0; i < unit->size; ++i) { + + dthread_t *thread = unit->threads[i]; + int res = dt_start_id(thread); + if (res != 0) { + dt_unit_unlock(unit); + pthread_mutex_unlock(&unit->_notify_mx); + return res; + } + } + + // Unlock unit + dt_unit_unlock(unit); + pthread_cond_broadcast(&unit->_notify); + pthread_mutex_unlock(&unit->_notify_mx); + return KNOT_EOK; +} + +int dt_signalize(dthread_t *thread, int signum) +{ + if (thread == 0) { + return KNOT_EINVAL; + } + + int ret = pthread_kill(thread->_thr, signum); + + /* Not thread id found or invalid signum. */ + if (ret == EINVAL || ret == ESRCH) { + return KNOT_EINVAL; + } + + /* Generic error. */ + if (ret < 0) { + return KNOT_ERROR; + } + + return KNOT_EOK; +} + +int dt_join(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + for (;;) { + + // Lock unit + pthread_mutex_lock(&unit->_report_mx); + dt_unit_lock(unit); + + // Browse threads + int active_threads = 0; + for (int i = 0; i < unit->size; ++i) { + + // Count active or cancelled but pending threads + dthread_t *thread = unit->threads[i]; + lock_thread_rw(thread); + if (thread->state & (ThreadActive|ThreadCancelled)) { + ++active_threads; + } + + // Reclaim dead threads, but only fast + if (thread->state & ThreadJoinable) { + unlock_thread_rw(thread); + pthread_join(thread->_thr, 0); + lock_thread_rw(thread); + thread->state = ThreadJoined; + unlock_thread_rw(thread); + } else { + unlock_thread_rw(thread); + } + } + + // Unlock unit + dt_unit_unlock(unit); + + // Check result + if (active_threads == 0) { + pthread_mutex_unlock(&unit->_report_mx); + break; + } + + // Wait for a thread to finish + pthread_cond_wait(&unit->_report, &unit->_report_mx); + pthread_mutex_unlock(&unit->_report_mx); + } + + return KNOT_EOK; +} + +int dt_stop(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + // Lock unit + pthread_mutex_lock(&unit->_notify_mx); + dt_unit_lock(unit); + + // Signalize all threads to stop + for (int i = 0; i < unit->size; ++i) { + + // Lock thread + dthread_t *thread = unit->threads[i]; + lock_thread_rw(thread); + if (thread->state & (ThreadIdle | ThreadActive)) { + thread->state = ThreadDead | ThreadCancelled; + dt_signalize(thread, SIGALRM); + } + unlock_thread_rw(thread); + } + + // Unlock unit + dt_unit_unlock(unit); + + // Broadcast notification + pthread_cond_broadcast(&unit->_notify); + pthread_mutex_unlock(&unit->_notify_mx); + + return KNOT_EOK; +} + +int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count) +{ + if (thread == NULL) { + return KNOT_EINVAL; + } + +#ifdef HAVE_PTHREAD_SETAFFINITY_NP + int ret = -1; + +/* Linux, FreeBSD interface. */ +#if defined(HAVE_CPUSET_LINUX) || defined(HAVE_CPUSET_BSD) + cpu_set_t set; + CPU_ZERO(&set); + for (unsigned i = 0; i < cpu_count; ++i) { + CPU_SET(cpu_id[i], &set); + } + ret = pthread_setaffinity_np(thread->_thr, sizeof(cpu_set_t), &set); +/* NetBSD interface. */ +#elif defined(HAVE_CPUSET_NETBSD) + cpuset_t *set = cpuset_create(); + if (set == NULL) { + return KNOT_ENOMEM; + } + cpuset_zero(set); + for (unsigned i = 0; i < cpu_count; ++i) { + cpuset_set(cpu_id[i], set); + } + ret = pthread_setaffinity_np(thread->_thr, cpuset_size(set), set); + cpuset_destroy(set); +#endif /* interface */ + + if (ret < 0) { + return KNOT_ERROR; + } + +#else /* HAVE_PTHREAD_SETAFFINITY_NP */ + return KNOT_ENOTSUP; +#endif + + return KNOT_EOK; +} + +int dt_activate(dthread_t *thread) +{ + return dt_update_thread(thread, ThreadActive); +} + +int dt_cancel(dthread_t *thread) +{ + return dt_update_thread(thread, ThreadIdle | ThreadCancelled); +} + +int dt_compact(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + // Lock unit + pthread_mutex_lock(&unit->_notify_mx); + dt_unit_lock(unit); + + // Reclaim all Idle threads + for (int i = 0; i < unit->size; ++i) { + + // Locked state update + dthread_t *thread = unit->threads[i]; + lock_thread_rw(thread); + if (thread->state & (ThreadIdle)) { + thread->state = ThreadDead | ThreadCancelled; + dt_signalize(thread, SIGALRM); + } + unlock_thread_rw(thread); + } + + // Notify all threads + pthread_cond_broadcast(&unit->_notify); + pthread_mutex_unlock(&unit->_notify_mx); + + // Join all threads + for (int i = 0; i < unit->size; ++i) { + + // Reclaim all dead threads + dthread_t *thread = unit->threads[i]; + lock_thread_rw(thread); + if (thread->state & (ThreadDead)) { + unlock_thread_rw(thread); + pthread_join(thread->_thr, 0); + lock_thread_rw(thread); + thread->state = ThreadJoined; + unlock_thread_rw(thread); + } else { + unlock_thread_rw(thread); + } + } + + // Unlock unit + dt_unit_unlock(unit); + + return KNOT_EOK; +} + +int dt_online_cpus(void) +{ + int ret = -1; +/* Linux, FreeBSD, NetBSD, OpenBSD, macOS/OS X 10.4+, Solaris */ +#ifdef _SC_NPROCESSORS_ONLN + ret = (int) sysconf(_SC_NPROCESSORS_ONLN); +#else +/* OS X < 10.4 and some other OS's (if not handled by sysconf() above) */ +/* hw.ncpu won't work on FreeBSD, OpenBSD, NetBSD, DragonFlyBSD, and recent macOS/OS X. */ +#if HAVE_SYSCTLBYNAME + size_t rlen = sizeof(int); + if (sysctlbyname("hw.ncpu", &ret, &rlen, NULL, 0) < 0) { + ret = -1; + } +#endif +#endif + return ret; +} + +int dt_optimal_size(void) +{ + int ret = dt_online_cpus(); + if (ret > 1) { + return ret; + } + + return DEFAULT_THR_COUNT; +} + +int dt_is_cancelled(dthread_t *thread) +{ + if (thread == 0) { + return 0; + } + + return thread->state & ThreadCancelled; /* No need to be locked. */ +} + +unsigned dt_get_id(dthread_t *thread) +{ + if (thread == NULL || thread->unit == NULL) { + return 0; + } + + dt_unit_t *unit = thread->unit; + for(int tid = 0; tid < unit->size; ++tid) { + if (thread == unit->threads[tid]) { + return tid; + } + } + + return 0; +} + +int dt_unit_lock(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + int ret = pthread_mutex_lock(&unit->_mx); + if (ret < 0) { + return knot_map_errno(); + } + + return KNOT_EOK; +} + +int dt_unit_unlock(dt_unit_t *unit) +{ + if (unit == 0) { + return KNOT_EINVAL; + } + + int ret = pthread_mutex_unlock(&unit->_mx); + if (ret < 0) { + return knot_map_errno(); + } + + return KNOT_EOK; +} diff --git a/src/knot/server/dthreads.h b/src/knot/server/dthreads.h new file mode 100644 index 0000000..0c243a1 --- /dev/null +++ b/src/knot/server/dthreads.h @@ -0,0 +1,295 @@ +/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +/*! + * \brief Threading API. + * + * Dynamic threads provide: + * - coherent and incoherent threading capabilities + * - thread repurposing + * - thread prioritization + * - on-the-fly changing of threading unit size + * + * Coherent threading unit is when all threads execute + * the same runnable function. + * + * Incoherent function is when at least one thread executes + * a different runnable than the others. + */ + +#pragma once + +#include <pthread.h> + +#define DEFAULT_THR_COUNT 2 /*!< Default thread count. */ + +/* Forward decls */ +struct dthread; +struct dt_unit; + +/*! + * \brief Thread state enumeration. + */ +typedef enum { + ThreadJoined = 1 << 0, /*!< Thread is finished and joined. */ + ThreadJoinable = 1 << 1, /*!< Thread is waiting to be reclaimed. */ + ThreadCancelled = 1 << 2, /*!< Thread is cancelled, finishing task. */ + ThreadDead = 1 << 3, /*!< Thread is finished, exiting. */ + ThreadIdle = 1 << 4, /*!< Thread is idle, waiting for purpose. */ + ThreadActive = 1 << 5 /*!< Thread is active, working on a task. */ +} dt_state_t; + +/*! + * \brief Thread runnable prototype. + * + * Runnable is basically a pointer to function which is called on active + * thread runtime. + * + * \note When implementing a runnable, keep in mind to check thread state as + * it may change, and implement a cooperative cancellation point. + * + * Implement this by checking dt_is_cancelled() and return + * as soon as possible. + */ +typedef int (*runnable_t)(struct dthread *); + +/*! + * \brief Single thread descriptor public API. + */ +typedef struct dthread { + volatile unsigned state; /*!< Bitfield of dt_flag flags. */ + runnable_t run; /*!< Runnable function or 0. */ + runnable_t destruct; /*!< Destructor function or 0. */ + void *data; /*!< Currently active data */ + struct dt_unit *unit; /*!< Reference to assigned unit. */ + void *_adata; /*!< Thread-specific data. */ + pthread_t _thr; /*!< Thread */ + pthread_attr_t _attr; /*!< Thread attributes */ + pthread_mutex_t _mx; /*!< Thread state change lock. */ +} dthread_t; + +/*! + * \brief Thread unit descriptor API. + * + * Thread unit consists of 1..N threads. + * Unit is coherent if all threads execute + * the same runnable. + */ +typedef struct dt_unit { + int size; /*!< Unit width (number of threads) */ + struct dthread **threads; /*!< Array of threads */ + pthread_cond_t _notify; /*!< Notify thread */ + pthread_mutex_t _notify_mx; /*!< Condition mutex */ + pthread_cond_t _report; /*!< Report thread state */ + pthread_mutex_t _report_mx; /*!< Condition mutex */ + pthread_mutex_t _mx; /*!< Unit lock */ +} dt_unit_t; + +/*! + * \brief Create a set of coherent threads. + * + * Coherent means, that the threads will share a common runnable and the data. + * + * \param count Requested thread count. + * \param runnable Runnable function for all threads. + * \param destructor Destructor for all threads. + * \param data Any data passed onto threads. + * + * \retval New instance if successful + * \retval NULL on error + */ +dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data); + +/*! + * \brief Free unit. + * + * \warning Behavior is undefined if threads are still active, make sure + * to call dt_join() first. + * + * \param unit Unit to be deleted. + */ +void dt_delete(dt_unit_t **unit); + +/*! + * \brief Start all threads in selected unit. + * + * \param unit Unit to be started. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters (unit is null). + */ +int dt_start(dt_unit_t *unit); + +/*! + * \brief Send given signal to thread. + * + * \note This is useful to interrupt some blocking I/O as well, for example + * with SIGALRM, which is handled by default. + * \note Signal handler may be overridden in runnable. + * + * \param thread Target thread instance. + * \param signum Signal code. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + * \retval KNOT_ERROR unspecified error. + */ +int dt_signalize(dthread_t *thread, int signum); + +/*! + * \brief Wait for all thread in unit to finish. + * + * \param unit Unit to be joined. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int dt_join(dt_unit_t *unit); + +/*! + * \brief Stop all threads in unit. + * + * Thread is interrupted at the nearest runnable cancellation point. + * + * \param unit Unit to be stopped. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int dt_stop(dt_unit_t *unit); + +/*! + * \brief Set thread affinity to masked CPU's. + * + * \param thread Target thread instance. + * \param cpu_id Array of CPU IDs to set affinity to. + * \param cpu_count Number of CPUs in the array, set to 0 for no CPU. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count); + +/*! + * \brief Wake up thread from idle state. + * + * Thread is awoken from idle state and reenters runnable. + * This function only affects idle threads. + * + * \note Unit needs to be started with dt_start() first, as the function + * doesn't affect dead threads. + * + * \param thread Target thread instance. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + * \retval KNOT_ENOTSUP operation not supported. + */ +int dt_activate(dthread_t *thread); + +/*! + * \brief Put thread to idle state, cancels current runnable function. + * + * Thread is flagged with Cancel flag and returns from runnable at the nearest + * cancellation point, which requires complying runnable function. + * + * \note Thread isn't disposed, but put to idle state until it's requested + * again or collected by dt_compact(). + * + * \param thread Target thread instance. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int dt_cancel(dthread_t *thread); + +/*! + * \brief Collect and dispose idle threads. + * + * \param unit Target unit instance. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int dt_compact(dt_unit_t *unit); + +/*! + * \brief Return number of online processors. + * + * \retval Number of online CPU's if success. + * \retval <0 on failure. + */ +int dt_online_cpus(void); + +/*! + * \brief Return optimal number of threads for instance. + * + * It is estimated as NUM_CPUs + CONSTANT. + * Fallback is DEFAULT_THR_COUNT (\see common.h). + * + * \return Number of threads. + */ +int dt_optimal_size(void); + +/*! + * \brief Return true if thread is cancelled. + * + * Synchronously check for ThreadCancelled flag. + * + * \param thread Target thread instance. + * + * \retval 1 if cancelled. + * \retval 0 if not cancelled. + */ +int dt_is_cancelled(dthread_t *thread); + +/*! + * \brief Return thread index in threading unit. + * + * \note Returns 0 when thread doesn't have a unit. + * + * \param thread Target thread instance. + * + * \return Thread index. + */ +unsigned dt_get_id(dthread_t *thread); + +/*! + * \brief Lock unit to prevent parallel operations which could alter unit + * at the same time. + * + * \param unit Target unit instance. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + * \retval KNOT_EAGAIN lack of resources to lock unit, try again. + * \retval KNOT_ERROR unspecified error. + */ +int dt_unit_lock(dt_unit_t *unit); + +/*! + * \brief Unlock unit. + * + * \see dt_unit_lock() + * + * \param unit Target unit instance. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + * \retval KNOT_EAGAIN lack of resources to unlock unit, try again. + * \retval KNOT_ERROR unspecified error. + */ +int dt_unit_unlock(dt_unit_t *unit); diff --git a/src/knot/server/proxyv2.c b/src/knot/server/proxyv2.c new file mode 100644 index 0000000..ff92263 --- /dev/null +++ b/src/knot/server/proxyv2.c @@ -0,0 +1,69 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/server/proxyv2.h" + +#include "contrib/proxyv2/proxyv2.h" +#include "knot/conf/conf.h" + +int proxyv2_header_strip(knot_pkt_t **query, + const struct sockaddr_storage *remote, + struct sockaddr_storage *new_remote) +{ + conf_t *pconf = conf(); + if (!pconf->cache.srv_proxy_enabled) { + return KNOT_EDENIED; + } + + uint8_t *pkt = (*query)->wire; + size_t pkt_len = (*query)->max_size; + + int offset = proxyv2_header_offset(pkt, pkt_len); + if (offset <= 0) { + return KNOT_EMALF; + } + + /* + * Check if the query was sent from an IP address authorized to send + * proxied DNS traffic. + */ + conf_val_t whitelist_val = conf_get(pconf, C_SRV, C_PROXY_ALLOWLIST); + if (!conf_addr_range_match(&whitelist_val, remote)) { + return KNOT_EDENIED; + } + + /* + * Store the provided remote address. + */ + int ret = proxyv2_addr_store(pkt, pkt_len, new_remote); + if (ret != KNOT_EOK) { + return ret; + } + + /* + * Re-parse the query message using the data in the + * packet following the PROXY v2 payload. And replace the original + * query with the decapsulated one. + */ + knot_pkt_t *q = knot_pkt_new(pkt + offset, pkt_len - offset, &(*query)->mm); + if (q == NULL) { + return KNOT_ENOMEM; + } + knot_pkt_free(*query); + *query = q; + + return knot_pkt_parse(q, 0); +} diff --git a/src/knot/server/proxyv2.h b/src/knot/server/proxyv2.h new file mode 100644 index 0000000..5cb1251 --- /dev/null +++ b/src/knot/server/proxyv2.h @@ -0,0 +1,23 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "libknot/packet/pkt.h" + +int proxyv2_header_strip(knot_pkt_t **query, + const struct sockaddr_storage *remote, + struct sockaddr_storage *new_remote); diff --git a/src/knot/server/server.c b/src/knot/server/server.c new file mode 100644 index 0000000..684526d --- /dev/null +++ b/src/knot/server/server.c @@ -0,0 +1,1335 @@ +/* Copyright (C) 2023 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#define __APPLE_USE_RFC_3542 + +#include <assert.h> +#include <sys/types.h> // OpenBSD +#include <netinet/tcp.h> // TCP_FASTOPEN +#include <sys/resource.h> + +#include "libknot/libknot.h" +#include "libknot/yparser/ypschema.h" +#include "libknot/xdp.h" +#if defined ENABLE_XDP && ENABLE_QUIC +#include "libknot/xdp/quic.h" +#endif // ENABLE_XDP && ENABLE_QUIC +#include "knot/common/log.h" +#include "knot/common/stats.h" +#include "knot/common/systemd.h" +#include "knot/common/unreachable.h" +#include "knot/conf/confio.h" +#include "knot/conf/migration.h" +#include "knot/conf/module.h" +#include "knot/dnssec/kasp/kasp_db.h" +#include "knot/journal/journal_basic.h" +#include "knot/server/server.h" +#include "knot/server/udp-handler.h" +#include "knot/server/tcp-handler.h" +#include "knot/zone/timers.h" +#include "knot/zone/zonedb-load.h" +#include "knot/worker/pool.h" +#include "contrib/conn_pool.h" +#include "contrib/net.h" +#include "contrib/openbsd/strlcat.h" +#include "contrib/os.h" +#include "contrib/sockaddr.h" +#include "contrib/trim.h" + +#ifdef ENABLE_XDP +#include <net/if.h> +#endif + +#ifdef SO_ATTACH_REUSEPORT_CBPF +#include <linux/filter.h> +#endif + +/*! \brief Minimal send/receive buffer sizes. */ +enum { + UDP_MIN_RCVSIZE = 4096, + UDP_MIN_SNDSIZE = 4096, + TCP_MIN_RCVSIZE = 4096, + TCP_MIN_SNDSIZE = sizeof(uint16_t) + UINT16_MAX +}; + +/*! \brief Unbind interface and clear the structure. */ +static void server_deinit_iface(iface_t *iface, bool dealloc) +{ + assert(iface); + + /* Free UDP handler. */ + if (iface->fd_udp != NULL) { + for (int i = 0; i < iface->fd_udp_count; i++) { + if (iface->fd_udp[i] > -1) { + close(iface->fd_udp[i]); + } + } + free(iface->fd_udp); + } + + for (int i = 0; i < iface->fd_xdp_count; i++) { +#ifdef ENABLE_XDP + knot_xdp_deinit(iface->xdp_sockets[i]); +#else + assert(0); +#endif + } + free(iface->fd_xdp); + free(iface->xdp_sockets); + + /* Free TCP handler. */ + if (iface->fd_tcp != NULL) { + for (int i = 0; i < iface->fd_tcp_count; i++) { + if (iface->fd_tcp[i] > -1) { + close(iface->fd_tcp[i]); + } + } + free(iface->fd_tcp); + } + + if (dealloc) { + free(iface); + } +} + +/*! \brief Deinit server interface list. */ +static void server_deinit_iface_list(iface_t *ifaces, size_t n) +{ + if (ifaces != NULL) { + for (size_t i = 0; i < n; i++) { + server_deinit_iface(ifaces + i, false); + } + free(ifaces); + } +} + +/*! + * \brief Attach SO_REUSEPORT socket filter for perfect CPU locality. + * + * \param sock Socket where to attach the CBPF filter to. + * \param sock_count Number of sockets. + */ +static bool server_attach_reuseport_bpf(const int sock, const int sock_count) +{ +#ifdef SO_ATTACH_REUSEPORT_CBPF + struct sock_filter code[] = { + /* A = raw_smp_processor_id(). */ + { BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU }, + /* Adjust the CPUID to socket group size. */ + { BPF_ALU | BPF_MOD | BPF_K, 0, 0, sock_count }, + /* Return A. */ + { BPF_RET | BPF_A, 0, 0, 0 }, + }; + + struct sock_fprog prog = { 0 }; + prog.len = sizeof(code) / sizeof(*code); + prog.filter = code; + + return setsockopt(sock, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &prog, sizeof(prog)) == 0; +#else + return true; +#endif +} + +/*! \brief Set lower bound for socket option. */ +static bool setsockopt_min(int sock, int option, int min) +{ + int value = 0; + socklen_t len = sizeof(value); + + if (getsockopt(sock, SOL_SOCKET, option, &value, &len) != 0) { + return false; + } + + assert(len == sizeof(value)); + if (value >= min) { + return true; + } + + return setsockopt(sock, SOL_SOCKET, option, &min, sizeof(min)) == 0; +} + +/*! + * \brief Enlarge send/receive buffers. + */ +static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize) +{ + return setsockopt_min(sock, SO_RCVBUF, min_recvsize) && + setsockopt_min(sock, SO_SNDBUF, min_sndsize); +} + +/*! + * \brief Enable source packet information retrieval. + */ +static bool enable_pktinfo(int sock, int family) +{ + int level = 0; + int option = 0; + + switch (family) { + case AF_INET: + level = IPPROTO_IP; +#if defined(IP_PKTINFO) + option = IP_PKTINFO; /* Linux */ +#elif defined(IP_RECVDSTADDR) + option = IP_RECVDSTADDR; /* BSD */ +#else + return false; +#endif + break; + case AF_INET6: + level = IPPROTO_IPV6; + option = IPV6_RECVPKTINFO; + break; + default: + return false; + } + + const int on = 1; + return setsockopt(sock, level, option, &on, sizeof(on)) == 0; +} + +/*! + * Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets + * ignore PMTU information and send packets with DF=0. + * Fragmentation is allowed if and only if the packet + * size exceeds the outgoing interface MTU or the packet + * encounters smaller MTU link in network. + * This mitigates DNS fragmentation attacks by preventing + * forged PMTU information. + * FreeBSD already has same semantics without setting + * the option. + */ +static int disable_pmtudisc(int sock, int family) +{ +#if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT) + if (family == AF_INET) { + int action_omit = IP_PMTUDISC_OMIT; + if (setsockopt(sock, IPPROTO_IP, IP_MTU_DISCOVER, &action_omit, + sizeof(action_omit)) != 0) { + return knot_map_errno(); + } + } +#endif + return KNOT_EOK; +} + +static iface_t *server_init_xdp_iface(struct sockaddr_storage *addr, bool route_check, + bool udp, bool tcp, uint16_t quic, unsigned *thread_id_start) +{ +#ifndef ENABLE_XDP + assert(0); + return NULL; +#else + conf_xdp_iface_t iface; + int ret = conf_xdp_iface(addr, &iface); + if (ret != KNOT_EOK) { + log_error("failed to initialize XDP interface (%s)", + knot_strerror(ret)); + return NULL; + } + + iface_t *new_if = calloc(1, sizeof(*new_if)); + if (new_if == NULL) { + log_error("failed to initialize XDP interface"); + return NULL; + } + memcpy(&new_if->addr, addr, sizeof(*addr)); + + new_if->fd_xdp = calloc(iface.queues, sizeof(int)); + new_if->xdp_sockets = calloc(iface.queues, sizeof(*new_if->xdp_sockets)); + if (new_if->fd_xdp == NULL || new_if->xdp_sockets == NULL) { + log_error("failed to initialize XDP interface"); + server_deinit_iface(new_if, true); + return NULL; + } + new_if->xdp_first_thread_id = *thread_id_start; + *thread_id_start += iface.queues; + + knot_xdp_filter_flag_t xdp_flags = udp ? KNOT_XDP_FILTER_UDP : 0; + if (tcp) { + xdp_flags |= KNOT_XDP_FILTER_TCP; + } + if (quic > 0) { + xdp_flags |= KNOT_XDP_FILTER_QUIC; + } + if (route_check) { + xdp_flags |= KNOT_XDP_FILTER_ROUTE; + } + + for (int i = 0; i < iface.queues; i++) { + knot_xdp_load_bpf_t mode = + (i == 0 ? KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); + ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i, + xdp_flags, iface.port, quic, mode, NULL); + if (ret == -EBUSY && i == 0) { + log_notice("XDP interface %s@%u is busy, retrying initialization", + iface.name, iface.port); + ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i, + xdp_flags, iface.port, quic, + KNOT_XDP_LOAD_BPF_ALWAYS_UNLOAD, NULL); + } + if (ret != KNOT_EOK) { + log_warning("failed to initialize XDP interface %s@%u, queue %d (%s)", + iface.name, iface.port, i, knot_strerror(ret)); + server_deinit_iface(new_if, true); + new_if = NULL; + break; + } + new_if->fd_xdp[i] = knot_xdp_socket_fd(new_if->xdp_sockets[i]); + new_if->fd_xdp_count++; + } + + if (ret == KNOT_EOK) { + char msg[128]; + (void)snprintf(msg, sizeof(msg), "initialized XDP interface %s", iface.name); + if (udp || tcp) { + char buf[32] = ""; + (void)snprintf(buf, sizeof(buf), ", %s%s%s port %u", + (udp ? "UDP" : ""), + (udp && tcp ? "/" : ""), + (tcp ? "TCP" : ""), + iface.port); + strlcat(msg, buf, sizeof(msg)); + } + if (quic) { + char buf[32] = ""; + (void)snprintf(buf, sizeof(buf), ", QUIC port %u", quic); + strlcat(msg, buf, sizeof(msg)); + } + + knot_xdp_mode_t mode = knot_eth_xdp_mode(if_nametoindex(iface.name)); + log_info("%s, queues %d, %s mode%s", msg, iface.queues, + (mode == KNOT_XDP_MODE_FULL ? "native" : "emulated"), + route_check ? ", route check" : ""); + } + + return new_if; +#endif +} + +/*! + * \brief Create and initialize new interface. + * + * Both TCP and UDP sockets will be created for the interface. + * + * \param addr Socket address. + * \param udp_thread_count Number of created UDP workers. + * \param tcp_thread_count Number of created TCP workers. + * \param tcp_reuseport Indication if reuseport on TCP is enabled. + * \param socket_affinity Indication if CBPF should be attached. + * + * \retval Pointer to a new initialized interface. + * \retval NULL if error. + */ +static iface_t *server_init_iface(struct sockaddr_storage *addr, + int udp_thread_count, int tcp_thread_count, + bool tcp_reuseport, bool socket_affinity) +{ + iface_t *new_if = calloc(1, sizeof(*new_if)); + if (new_if == NULL) { + log_error("failed to initialize interface"); + return NULL; + } + memcpy(&new_if->addr, addr, sizeof(*addr)); + + /* Convert to string address format. */ + char addr_str[SOCKADDR_STRLEN] = { 0 }; + sockaddr_tostr(addr_str, sizeof(addr_str), addr); + + int udp_socket_count = 1; + int udp_bind_flags = 0; + int tcp_socket_count = 1; + int tcp_bind_flags = 0; + +#ifdef ENABLE_REUSEPORT + udp_socket_count = udp_thread_count; + udp_bind_flags |= NET_BIND_MULTIPLE; + + if (tcp_reuseport) { + tcp_socket_count = tcp_thread_count; + tcp_bind_flags |= NET_BIND_MULTIPLE; + } +#endif + + new_if->fd_udp = malloc(udp_socket_count * sizeof(int)); + new_if->fd_tcp = malloc(tcp_socket_count * sizeof(int)); + if (new_if->fd_udp == NULL || new_if->fd_tcp == NULL) { + log_error("failed to initialize interface"); + server_deinit_iface(new_if, true); + return NULL; + } + + const mode_t unix_mode = S_IWUSR | S_IWGRP | S_IWOTH; + + bool warn_bind = true; + bool warn_cbpf = true; + bool warn_bufsize = true; + bool warn_pktinfo = true; + bool warn_flag_misc = true; + + /* Create bound UDP sockets. */ + for (int i = 0; i < udp_socket_count; i++) { + int sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode); + if (sock == KNOT_EADDRNOTAVAIL) { + udp_bind_flags |= NET_BIND_NONLOCAL; + sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode); + if (sock >= 0 && warn_bind) { + log_warning("address %s UDP bound, but required nonlocal bind", addr_str); + warn_bind = false; + } + } + + if (sock < 0) { + log_error("cannot bind address %s UDP (%s)", addr_str, + knot_strerror(sock)); + server_deinit_iface(new_if, true); + return NULL; + } + + if ((udp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) { + if (!server_attach_reuseport_bpf(sock, udp_socket_count) && + warn_cbpf) { + log_warning("cannot ensure optimal CPU locality for UDP"); + warn_cbpf = false; + } + } + + if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE) && + warn_bufsize) { + log_warning("failed to set network buffer sizes for UDP"); + warn_bufsize = false; + } + + if (sockaddr_is_any(addr) && !enable_pktinfo(sock, addr->ss_family) && + warn_pktinfo) { + log_warning("failed to enable received packet information retrieval"); + warn_pktinfo = false; + } + + int ret = disable_pmtudisc(sock, addr->ss_family); + if (ret != KNOT_EOK && warn_flag_misc) { + log_warning("failed to disable Path MTU discovery for IPv4/UDP (%s)", + knot_strerror(ret)); + warn_flag_misc = false; + } + + new_if->fd_udp[new_if->fd_udp_count] = sock; + new_if->fd_udp_count += 1; + } + + warn_bind = true; + warn_cbpf = true; + warn_bufsize = true; + warn_flag_misc = true; + + /* Create bound TCP sockets. */ + for (int i = 0; i < tcp_socket_count; i++) { + int sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode); + if (sock == KNOT_EADDRNOTAVAIL) { + tcp_bind_flags |= NET_BIND_NONLOCAL; + sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode); + if (sock >= 0 && warn_bind) { + log_warning("address %s TCP bound, but required nonlocal bind", addr_str); + warn_bind = false; + } + } + + if (sock < 0) { + log_error("cannot bind address %s TCP (%s)", addr_str, + knot_strerror(sock)); + server_deinit_iface(new_if, true); + return NULL; + } + + if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE) && + warn_bufsize) { + log_warning("failed to set network buffer sizes for TCP"); + warn_bufsize = false; + } + + new_if->fd_tcp[new_if->fd_tcp_count] = sock; + new_if->fd_tcp_count += 1; + + /* Listen for incoming connections. */ + int ret = listen(sock, TCP_BACKLOG_SIZE); + if (ret < 0) { + log_error("failed to listen on TCP interface %s", addr_str); + server_deinit_iface(new_if, true); + return NULL; + } + + if ((tcp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) { + if (!server_attach_reuseport_bpf(sock, tcp_socket_count) && + warn_cbpf) { + log_warning("cannot ensure optimal CPU locality for TCP"); + warn_cbpf = false; + } + } + + /* Try to enable TCP Fast Open. */ + ret = net_bound_tfo(sock, TCP_BACKLOG_SIZE); + if (ret != KNOT_EOK && ret != KNOT_ENOTSUP && warn_flag_misc) { + log_warning("failed to enable TCP Fast Open on %s (%s)", + addr_str, knot_strerror(ret)); + warn_flag_misc = false; + } + } + + return new_if; +} + +static void log_sock_conf(conf_t *conf) +{ + char buf[128] = ""; +#if defined(ENABLE_REUSEPORT) + strlcat(buf, "UDP", sizeof(buf)); + if (conf->cache.srv_tcp_reuseport) { + strlcat(buf, "/TCP", sizeof(buf)); + } + strlcat(buf, " reuseport", sizeof(buf)); + if (conf->cache.srv_socket_affinity) { + strlcat(buf, ", socket affinity", sizeof(buf)); + } +#endif +#if defined(TCP_FASTOPEN) + if (buf[0] != '\0') { + strlcat(buf, ", ", sizeof(buf)); + } + strlcat(buf, "incoming", sizeof(buf)); + if (conf->cache.srv_tcp_fastopen) { + strlcat(buf, "/outgoing", sizeof(buf)); + } + strlcat(buf, " TCP Fast Open", sizeof(buf)); +#endif + if (buf[0] != '\0') { + log_info("using %s", buf); + } +} + +/*! \brief Initialize bound sockets according to configuration. */ +static int configure_sockets(conf_t *conf, server_t *s) +{ + if (s->state & ServerRunning) { + return KNOT_EOK; + } + + conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN); + conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN); + conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR); + + if (listen_val.code == KNOT_EOK) { + log_sock_conf(conf); + } else if (lisxdp_val.code != KNOT_EOK) { + log_warning("no network interface configured"); + return KNOT_EOK; + } + +#ifdef ENABLE_XDP + if (lisxdp_val.code == KNOT_EOK && !linux_at_least(5, 11)) { + struct rlimit min_limit = { RLIM_INFINITY, RLIM_INFINITY }; + struct rlimit cur_limit = { 0 }; + if (getrlimit(RLIMIT_MEMLOCK, &cur_limit) != 0 || + cur_limit.rlim_cur != min_limit.rlim_cur || + cur_limit.rlim_max != min_limit.rlim_max) { + int ret = setrlimit(RLIMIT_MEMLOCK, &min_limit); + if (ret != 0) { + log_error("failed to increase RLIMIT_MEMLOCK (%s)", + knot_strerror(errno)); + return KNOT_ESYSTEM; + } + } + } +#endif + + size_t real_nifs = 0; + size_t nifs = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val); + iface_t *newlist = calloc(nifs, sizeof(*newlist)); + if (newlist == NULL) { + log_error("failed to allocate memory for network sockets"); + return KNOT_ENOMEM; + } + + /* Normal UDP and TCP sockets. */ + unsigned size_udp = s->handlers[IO_UDP].handler.unit->size; + unsigned size_tcp = s->handlers[IO_TCP].handler.unit->size; + bool tcp_reuseport = conf->cache.srv_tcp_reuseport; + bool socket_affinity = conf->cache.srv_socket_affinity; + char *rundir = conf_abs_path(&rundir_val, NULL); + while (listen_val.code == KNOT_EOK) { + struct sockaddr_storage addr = conf_addr(&listen_val, rundir); + char addr_str[SOCKADDR_STRLEN] = { 0 }; + sockaddr_tostr(addr_str, sizeof(addr_str), &addr); + log_info("binding to interface %s", addr_str); + + iface_t *new_if = server_init_iface(&addr, size_udp, size_tcp, + tcp_reuseport, socket_affinity); + if (new_if == NULL) { + server_deinit_iface_list(newlist, nifs); + free(rundir); + return KNOT_ERROR; + } + memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist)); + free(new_if); + + conf_val_next(&listen_val); + } + free(rundir); + + /* XDP sockets. */ + bool xdp_udp = conf->cache.xdp_udp; + bool xdp_tcp = conf->cache.xdp_tcp; + uint16_t xdp_quic = conf->cache.xdp_quic; + bool route_check = conf->cache.xdp_route_check; + unsigned thread_id = s->handlers[IO_UDP].handler.unit->size + + s->handlers[IO_TCP].handler.unit->size; + while (lisxdp_val.code == KNOT_EOK) { + struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL); + char addr_str[SOCKADDR_STRLEN] = { 0 }; + sockaddr_tostr(addr_str, sizeof(addr_str), &addr); + log_info("binding to XDP interface %s", addr_str); + + iface_t *new_if = server_init_xdp_iface(&addr, route_check, xdp_udp, + xdp_tcp, xdp_quic, &thread_id); + if (new_if == NULL) { + server_deinit_iface_list(newlist, nifs); + return KNOT_ERROR; + } + memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist)); + free(new_if); + + conf_val_next(&lisxdp_val); + } + assert(real_nifs <= nifs); + nifs = real_nifs; + +#if defined ENABLE_XDP && ENABLE_QUIC + if (xdp_quic > 0) { + char *tls_cert = conf_tls(conf, C_CERT_FILE); + char *tls_key = conf_tls(conf, C_KEY_FILE); + if (tls_cert == NULL) { + log_notice("QUIC, no server certificate configured, using one-time one"); + } + s->quic_creds = knot_xquic_init_creds(true, tls_cert, tls_key); + free(tls_cert); + free(tls_key); + if (s->quic_creds == NULL) { + log_error("QUIC, failed to initialize server credentials"); + server_deinit_iface_list(newlist, nifs); + return KNOT_ERROR; + } + } +#endif // ENABLE_XDP && ENABLE_QUIC + + /* Publish new list. */ + s->ifaces = newlist; + s->n_ifaces = nifs; + + /* Assign thread identifiers unique per all handlers. */ + unsigned thread_count = 0; + for (unsigned proto = IO_UDP; proto <= IO_XDP; ++proto) { + dt_unit_t *tu = s->handlers[proto].handler.unit; + for (unsigned i = 0; tu != NULL && i < tu->size; ++i) { + s->handlers[proto].handler.thread_id[i] = thread_count++; + } + } + + return KNOT_EOK; +} + +int server_init(server_t *server, int bg_workers) +{ + if (server == NULL) { + return KNOT_EINVAL; + } + + /* Clear the structure. */ + memset(server, 0, sizeof(server_t)); + + /* Initialize event scheduler. */ + if (evsched_init(&server->sched, server) != KNOT_EOK) { + return KNOT_ENOMEM; + } + + server->workers = worker_pool_create(bg_workers); + if (server->workers == NULL) { + evsched_deinit(&server->sched); + return KNOT_ENOMEM; + } + + int ret = catalog_update_init(&server->catalog_upd); + if (ret != KNOT_EOK) { + worker_pool_destroy(server->workers); + evsched_deinit(&server->sched); + return ret; + } + + zone_backups_init(&server->backup_ctxs); + + char *catalog_dir = conf_db(conf(), C_CATALOG_DB); + conf_val_t catalog_size = conf_db_param(conf(), C_CATALOG_DB_MAX_SIZE); + catalog_init(&server->catalog, catalog_dir, conf_int(&catalog_size)); + free(catalog_dir); + conf()->catalog = &server->catalog; + + char *journal_dir = conf_db(conf(), C_JOURNAL_DB); + conf_val_t journal_size = conf_db_param(conf(), C_JOURNAL_DB_MAX_SIZE); + conf_val_t journal_mode = conf_db_param(conf(), C_JOURNAL_DB_MODE); + knot_lmdb_init(&server->journaldb, journal_dir, conf_int(&journal_size), journal_env_flags(conf_opt(&journal_mode), false), NULL); + free(journal_dir); + + kasp_db_ensure_init(&server->kaspdb, conf()); + + char *timer_dir = conf_db(conf(), C_TIMER_DB); + conf_val_t timer_size = conf_db_param(conf(), C_TIMER_DB_MAX_SIZE); + knot_lmdb_init(&server->timerdb, timer_dir, conf_int(&timer_size), 0, NULL); + free(timer_dir); + + return KNOT_EOK; +} + +void server_deinit(server_t *server) +{ + if (server == NULL) { + return; + } + + zone_backups_deinit(&server->backup_ctxs); + + /* Save zone timers. */ + if (server->zone_db != NULL) { + log_info("updating persistent timer DB"); + int ret = zone_timers_write_all(&server->timerdb, server->zone_db); + if (ret != KNOT_EOK) { + log_warning("failed to update persistent timer DB (%s)", + knot_strerror(ret)); + } + } + + /* Free remaining interfaces. */ + server_deinit_iface_list(server->ifaces, server->n_ifaces); + + /* Free threads and event handlers. */ + worker_pool_destroy(server->workers); + + /* Free zone database. */ + knot_zonedb_deep_free(&server->zone_db, true); + + /* Free remaining events. */ + evsched_deinit(&server->sched); + + /* Free catalog zone context. */ + catalog_update_clear(&server->catalog_upd); + catalog_update_deinit(&server->catalog_upd); + catalog_deinit(&server->catalog); + + /* Close persistent timers DB. */ + knot_lmdb_deinit(&server->timerdb); + + /* Close kasp_db. */ + knot_lmdb_deinit(&server->kaspdb); + + /* Close journal database if open. */ + knot_lmdb_deinit(&server->journaldb); + + /* Close and deinit connection pool. */ + conn_pool_deinit(global_conn_pool); + global_conn_pool = NULL; + knot_unreachables_deinit(&global_unreachables); + +#if defined ENABLE_XDP && ENABLE_QUIC + knot_xquic_free_creds(server->quic_creds); +#endif // ENABLE_XDP && ENABLE_QUIC +} + +static int server_init_handler(server_t *server, int index, int thread_count, + runnable_t runnable, runnable_t destructor) +{ + /* Initialize */ + iohandler_t *h = &server->handlers[index].handler; + memset(h, 0, sizeof(iohandler_t)); + h->server = server; + h->unit = dt_create(thread_count, runnable, destructor, h); + if (h->unit == NULL) { + return KNOT_ENOMEM; + } + + h->thread_state = calloc(thread_count, sizeof(unsigned)); + if (h->thread_state == NULL) { + dt_delete(&h->unit); + return KNOT_ENOMEM; + } + + h->thread_id = calloc(thread_count, sizeof(unsigned)); + if (h->thread_id == NULL) { + free(h->thread_state); + dt_delete(&h->unit); + return KNOT_ENOMEM; + } + + return KNOT_EOK; +} + +static void server_free_handler(iohandler_t *h) +{ + if (h == NULL || h->server == NULL) { + return; + } + + /* Wait for threads to finish */ + if (h->unit) { + dt_stop(h->unit); + dt_join(h->unit); + } + + /* Destroy worker context. */ + dt_delete(&h->unit); + free(h->thread_state); + free(h->thread_id); +} + +static void worker_wait_cb(worker_pool_t *pool) +{ + systemd_zone_load_timeout_notify(); + + static uint64_t last_ns = 0; + struct timespec now = time_now(); + uint64_t now_ns = 1000000000 * now.tv_sec + now.tv_nsec; + /* Too frequent worker_pool_status() call with many zones is expensive. */ + if (now_ns - last_ns > 1000000000) { + int running, queued; + worker_pool_status(pool, true, &running, &queued); + systemd_tasks_status_notify(running + queued); + last_ns = now_ns; + } +} + +int server_start(server_t *server, bool async) +{ + if (server == NULL) { + return KNOT_EINVAL; + } + + /* Start workers. */ + worker_pool_start(server->workers); + + /* Wait for enqueued events if not asynchronous. */ + if (!async) { + worker_pool_wait_cb(server->workers, worker_wait_cb); + systemd_tasks_status_notify(0); + } + + /* Start evsched handler. */ + evsched_start(&server->sched); + + /* Start I/O handlers. */ + server->state |= ServerRunning; + for (int proto = IO_UDP; proto <= IO_XDP; ++proto) { + if (server->handlers[proto].size > 0) { + int ret = dt_start(server->handlers[proto].handler.unit); + if (ret != KNOT_EOK) { + return ret; + } + } + } + + return KNOT_EOK; +} + +void server_wait(server_t *server) +{ + if (server == NULL) { + return; + } + + evsched_join(&server->sched); + worker_pool_join(server->workers); + + for (int proto = IO_UDP; proto <= IO_XDP; ++proto) { + if (server->handlers[proto].size > 0) { + server_free_handler(&server->handlers[proto].handler); + } + } +} + +static int reload_conf(conf_t *new_conf) +{ + yp_schema_purge_dynamic(new_conf->schema); + + /* Re-load common modules. */ + int ret = conf_mod_load_common(new_conf); + if (ret != KNOT_EOK) { + return ret; + } + + /* Re-import config file if specified. */ + const char *filename = conf()->filename; + if (filename != NULL) { + log_info("reloading configuration file '%s'", filename); + + /* Import the configuration file. */ + ret = conf_import(new_conf, filename, true, false); + if (ret != KNOT_EOK) { + log_error("failed to load configuration file (%s)", + knot_strerror(ret)); + return ret; + } + } else { + log_info("reloading configuration database '%s'", + knot_db_lmdb_get_path(new_conf->db)); + + /* Re-load extra modules. */ + for (conf_iter_t iter = conf_iter(new_conf, C_MODULE); + iter.code == KNOT_EOK; conf_iter_next(new_conf, &iter)) { + conf_val_t id = conf_iter_id(new_conf, &iter); + conf_val_t file = conf_id_get(new_conf, C_MODULE, C_FILE, &id); + ret = conf_mod_load_extra(new_conf, conf_str(&id), conf_str(&file), + MOD_EXPLICIT); + if (ret != KNOT_EOK) { + conf_iter_finish(new_conf, &iter); + return ret; + } + } + } + + conf_mod_load_purge(new_conf, false); + + // Migrate from old schema. + ret = conf_migrate(new_conf); + if (ret != KNOT_EOK) { + log_error("failed to migrate configuration (%s)", knot_strerror(ret)); + } + + return KNOT_EOK; +} + +/*! \brief Check if parameter listen(-xdp) has been changed since knotd started. */ +static bool listen_changed(conf_t *conf, server_t *server) +{ + assert(server->ifaces); + + conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN); + conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN); + size_t new_count = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val); + size_t old_count = server->n_ifaces; + if (new_count != old_count) { + return true; + } + + conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR); + char *rundir = conf_abs_path(&rundir_val, NULL); + size_t matches = 0; + + /* Find matching interfaces. */ + while (listen_val.code == KNOT_EOK) { + struct sockaddr_storage addr = conf_addr(&listen_val, rundir); + bool found = false; + for (size_t i = 0; i < server->n_ifaces; i++) { + if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) { + matches++; + found = true; + break; + } + } + if (!found) { + break; + } + conf_val_next(&listen_val); + } + free(rundir); + + while (lisxdp_val.code == KNOT_EOK) { + struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL); + bool found = false; + for (size_t i = 0; i < server->n_ifaces; i++) { + if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) { + matches++; + found = true; + break; + } + } + if (!found) { + break; + } + conf_val_next(&lisxdp_val); + } + + return matches != old_count; +} + +/*! \brief Log warnings if config change requires a restart. */ +static void warn_server_reconfigure(conf_t *conf, server_t *server) +{ + const char *msg = "changes of %s require restart to take effect"; + + static bool warn_tcp_reuseport = true; + static bool warn_socket_affinity = true; + static bool warn_udp = true; + static bool warn_tcp = true; + static bool warn_bg = true; + static bool warn_listen = true; + static bool warn_xdp_udp = true; + static bool warn_xdp_tcp = true; + static bool warn_xdp_quic = true; + static bool warn_route_check = true; + static bool warn_rmt_pool_limit = true; + + if (warn_tcp_reuseport && conf->cache.srv_tcp_reuseport != conf_get_bool(conf, C_SRV, C_TCP_REUSEPORT)) { + log_warning(msg, &C_TCP_REUSEPORT[1]); + warn_tcp_reuseport = false; + } + + if (warn_socket_affinity && conf->cache.srv_socket_affinity != conf_get_bool(conf, C_SRV, C_SOCKET_AFFINITY)) { + log_warning(msg, &C_SOCKET_AFFINITY[1]); + warn_socket_affinity = false; + } + + if (warn_udp && server->handlers[IO_UDP].size != conf_udp_threads(conf)) { + log_warning(msg, &C_UDP_WORKERS[1]); + warn_udp = false; + } + + if (warn_tcp && server->handlers[IO_TCP].size != conf_tcp_threads(conf)) { + log_warning(msg, &C_TCP_WORKERS[1]); + warn_tcp = false; + } + + if (warn_bg && conf->cache.srv_bg_threads != conf_bg_threads(conf)) { + log_warning(msg, &C_BG_WORKERS[1]); + warn_bg = false; + } + + if (warn_listen && server->ifaces != NULL && listen_changed(conf, server)) { + log_warning(msg, "listen(-xdp)"); + warn_listen = false; + } + + if (warn_xdp_udp && conf->cache.xdp_udp != conf_get_bool(conf, C_XDP, C_UDP)) { + log_warning(msg, &C_UDP[1]); + warn_xdp_udp = false; + } + + if (warn_xdp_tcp && conf->cache.xdp_tcp != conf_get_bool(conf, C_XDP, C_TCP)) { + log_warning(msg, &C_TCP[1]); + warn_xdp_tcp = false; + } + + if (warn_xdp_quic && (bool)conf->cache.xdp_quic != conf_get_bool(conf, C_XDP, C_QUIC)) { + log_warning(msg, &C_QUIC[1]); + warn_xdp_quic = false; + } + + if (warn_xdp_quic && conf->cache.xdp_quic > 0 && + conf->cache.xdp_quic != conf_get_int(conf, C_XDP, C_QUIC_PORT)) { + log_warning(msg, &C_QUIC_PORT[1]); + warn_xdp_quic = false; + } + + if (warn_route_check && conf->cache.xdp_route_check != conf_get_bool(conf, C_XDP, C_ROUTE_CHECK)) { + log_warning(msg, &C_ROUTE_CHECK[1]); + warn_route_check = false; + } + + if (warn_rmt_pool_limit && global_conn_pool != NULL && + global_conn_pool->capacity != conf_get_int(conf, C_SRV, C_RMT_POOL_LIMIT)) { + log_warning(msg, &C_RMT_POOL_LIMIT[1]); + warn_rmt_pool_limit = false; + } +} + +int server_reload(server_t *server, reload_t mode) +{ + if (server == NULL) { + return KNOT_EINVAL; + } + + systemd_reloading_notify(); + + /* Check for no edit mode. */ + if (conf()->io.txn != NULL) { + log_warning("reload aborted due to active configuration transaction"); + systemd_ready_notify(); + return KNOT_TXN_EEXISTS; + } + + conf_t *new_conf = NULL; + int ret = conf_clone(&new_conf); + if (ret != KNOT_EOK) { + log_error("failed to initialize configuration (%s)", + knot_strerror(ret)); + systemd_ready_notify(); + return ret; + } + + yp_flag_t flags = conf()->io.flags; + bool full = !(flags & CONF_IO_FACTIVE); + bool reuse_modules = !full && !(flags & CONF_IO_FRLD_MOD); + + /* Reload configuration and modules if full reload or a module change. */ + if (full || !reuse_modules) { + ret = reload_conf(new_conf); + if (ret != KNOT_EOK) { + conf_free(new_conf); + systemd_ready_notify(); + return ret; + } + + conf_activate_modules(new_conf, server, NULL, new_conf->query_modules, + &new_conf->query_plan); + } + + conf_update_flag_t upd_flags = CONF_UPD_FNOFREE; + if (!full) { + upd_flags |= CONF_UPD_FCONFIO; + } + if (reuse_modules) { + upd_flags |= CONF_UPD_FMODULES; + } + + /* Update to the new config. */ + conf_t *old_conf = conf_update(new_conf, upd_flags); + + /* Reload each component if full reload or a specific one if required. */ + if (full || (flags & CONF_IO_FRLD_LOG)) { + log_reconfigure(conf()); + } + if (full || (flags & CONF_IO_FRLD_SRV)) { + (void)server_reconfigure(conf(), server); + warn_server_reconfigure(conf(), server); + stats_reconfigure(conf(), server); + } + if (full || (flags & (CONF_IO_FRLD_ZONES | CONF_IO_FRLD_ZONE))) { + server_update_zones(conf(), server, mode); + } + + /* Free old config needed for module unload in zone reload. */ + conf_free(old_conf); + + if (full) { + log_info("configuration reloaded"); + } else { + // Reset confio reload context. + conf()->io.flags = YP_FNONE; + if (conf()->io.zones != NULL) { + trie_clear(conf()->io.zones); + } + } + + systemd_ready_notify(); + + return KNOT_EOK; +} + +void server_stop(server_t *server) +{ + log_info("stopping server"); + systemd_stopping_notify(); + + /* Stop scheduler. */ + evsched_stop(&server->sched); + /* Interrupt background workers. */ + worker_pool_stop(server->workers); + + /* Clear 'running' flag. */ + server->state &= ~ServerRunning; +} + +static int set_handler(server_t *server, int index, unsigned size, runnable_t run) +{ + /* Initialize I/O handlers. */ + int ret = server_init_handler(server, index, size, run, NULL); + if (ret != KNOT_EOK) { + return ret; + } + + server->handlers[index].size = size; + + return KNOT_EOK; +} + +static int configure_threads(conf_t *conf, server_t *server) +{ + int ret = set_handler(server, IO_UDP, conf->cache.srv_udp_threads, udp_master); + if (ret != KNOT_EOK) { + return ret; + } + + if (conf->cache.srv_xdp_threads > 0) { + ret = set_handler(server, IO_XDP, conf->cache.srv_xdp_threads, udp_master); + if (ret != KNOT_EOK) { + return ret; + } + } + + return set_handler(server, IO_TCP, conf->cache.srv_tcp_threads, tcp_master); +} + +static int reconfigure_journal_db(conf_t *conf, server_t *server) +{ + char *journal_dir = conf_db(conf, C_JOURNAL_DB); + conf_val_t journal_size = conf_db_param(conf, C_JOURNAL_DB_MAX_SIZE); + conf_val_t journal_mode = conf_db_param(conf, C_JOURNAL_DB_MODE); + int ret = knot_lmdb_reinit(&server->journaldb, journal_dir, conf_int(&journal_size), + journal_env_flags(conf_opt(&journal_mode), false)); + if (ret != KNOT_EOK) { + log_warning("ignored reconfiguration of journal DB (%s)", knot_strerror(ret)); + } + free(journal_dir); + + return KNOT_EOK; // not "ret" +} + +static int reconfigure_kasp_db(conf_t *conf, server_t *server) +{ + char *kasp_dir = conf_db(conf, C_KASP_DB); + conf_val_t kasp_size = conf_db_param(conf, C_KASP_DB_MAX_SIZE); + int ret = knot_lmdb_reinit(&server->kaspdb, kasp_dir, conf_int(&kasp_size), 0); + if (ret != KNOT_EOK) { + log_warning("ignored reconfiguration of KASP DB (%s)", knot_strerror(ret)); + } + free(kasp_dir); + + return KNOT_EOK; // not "ret" +} + +static int reconfigure_timer_db(conf_t *conf, server_t *server) +{ + char *timer_dir = conf_db(conf, C_TIMER_DB); + conf_val_t timer_size = conf_db_param(conf, C_TIMER_DB_MAX_SIZE); + int ret = knot_lmdb_reconfigure(&server->timerdb, timer_dir, conf_int(&timer_size), 0); + free(timer_dir); + return ret; +} + +static int reconfigure_remote_pool(conf_t *conf) +{ + conf_val_t val = conf_get(conf, C_SRV, C_RMT_POOL_LIMIT); + size_t limit = conf_int(&val); + val = conf_get(conf, C_SRV, C_RMT_POOL_TIMEOUT); + knot_timediff_t timeout = conf_int(&val); + if (global_conn_pool == NULL && limit > 0) { + conn_pool_t *new_pool = conn_pool_init(limit, timeout); + if (new_pool == NULL) { + return KNOT_ENOMEM; + } + global_conn_pool = new_pool; + } else { + (void)conn_pool_timeout(global_conn_pool, timeout); + } + + val = conf_get(conf, C_SRV, C_RMT_RETRY_DELAY); + int delay_ms = conf_int(&val); + if (global_unreachables == NULL && delay_ms > 0) { + global_unreachables = knot_unreachables_init(delay_ms); + } else { + (void)knot_unreachables_ttl(global_unreachables, delay_ms); + } + + return KNOT_EOK; +} + +int server_reconfigure(conf_t *conf, server_t *server) +{ + if (conf == NULL || server == NULL) { + return KNOT_EINVAL; + } + + int ret; + + /* First reconfiguration. */ + if (!(server->state & ServerRunning)) { + log_info("Knot DNS %s starting", PACKAGE_VERSION); + + size_t mapsize = conf->mapsize / (1024 * 1024); + if (conf->filename != NULL) { + log_info("loaded configuration file '%s', mapsize %zu MiB", + conf->filename, mapsize); + } else { + log_info("loaded configuration database '%s', mapsize %zu MiB", + knot_db_lmdb_get_path(conf->db), mapsize); + } + + /* Configure server threads. */ + if ((ret = configure_threads(conf, server)) != KNOT_EOK) { + log_error("failed to configure server threads (%s)", + knot_strerror(ret)); + return ret; + } + + /* Configure sockets. */ + if ((ret = configure_sockets(conf, server)) != KNOT_EOK) { + return ret; + } + + if (conf_lmdb_readers(conf) > CONF_MAX_DB_READERS) { + log_warning("config, exceeded number of database readers"); + } + } + + /* Reconfigure journal DB. */ + if ((ret = reconfigure_journal_db(conf, server)) != KNOT_EOK) { + log_error("failed to reconfigure journal DB (%s)", + knot_strerror(ret)); + } + + /* Reconfigure KASP DB. */ + if ((ret = reconfigure_kasp_db(conf, server)) != KNOT_EOK) { + log_error("failed to reconfigure KASP DB (%s)", + knot_strerror(ret)); + } + + /* Reconfigure Timer DB. */ + if ((ret = reconfigure_timer_db(conf, server)) != KNOT_EOK) { + log_error("failed to reconfigure Timer DB (%s)", + knot_strerror(ret)); + } + + /* Reconfigure connection pool. */ + if ((ret = reconfigure_remote_pool(conf)) != KNOT_EOK) { + log_error("failed to reconfigure remote pool (%s)", + knot_strerror(ret)); + } + + return KNOT_EOK; +} + +void server_update_zones(conf_t *conf, server_t *server, reload_t mode) +{ + if (conf == NULL || server == NULL) { + return; + } + + /* Prevent emitting of new zone events. */ + if (server->zone_db) { + knot_zonedb_foreach(server->zone_db, zone_events_freeze); + } + + /* Suspend adding events to worker pool queue, wait for queued events. */ + evsched_pause(&server->sched); + worker_pool_wait(server->workers); + + /* Reload zone database and free old zones. */ + zonedb_reload(conf, server, mode); + + /* Trim extra heap. */ + mem_trim(); + + /* Resume processing events on new zones. */ + evsched_resume(&server->sched); + if (server->zone_db) { + knot_zonedb_foreach(server->zone_db, zone_events_start); + } +} diff --git a/src/knot/server/server.h b/src/knot/server/server.h new file mode 100644 index 0000000..5adafdb --- /dev/null +++ b/src/knot/server/server.h @@ -0,0 +1,203 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <stdatomic.h> + +#include "knot/conf/conf.h" +#include "knot/catalog/catalog_update.h" +#include "knot/common/evsched.h" +#include "knot/common/fdset.h" +#include "knot/journal/knot_lmdb.h" +#include "knot/server/dthreads.h" +#include "knot/worker/pool.h" +#include "knot/zone/backup.h" +#include "knot/zone/zonedb.h" + +struct server; +struct knot_xdp_socket; +struct knot_quic_creds; + +/*! + * \brief I/O handler structure. + */ +typedef struct { + struct server *server; /*!< Reference to server. */ + dt_unit_t *unit; /*!< Threading unit. */ + unsigned *thread_state; /*!< Thread states. */ + unsigned *thread_id; /*!< Thread identifiers per all handlers. */ +} iohandler_t; + +/*! + * \brief Server state flags. + */ +typedef enum { + ServerIdle = 0 << 0, /*!< Server is idle. */ + ServerRunning = 1 << 0, /*!< Server is running. */ +} server_state_t; + +/*! + * \brief Server reload kinds. + */ +typedef enum { + RELOAD_NONE = 0, + RELOAD_FULL = 1 << 0, /*!< Reload the server and all zones. */ + RELOAD_COMMIT = 1 << 1, /*!< Process changes from dynamic configuration. */ + RELOAD_ZONES = 1 << 2, /*!< Reload all zones. */ + RELOAD_CATALOG = 1 << 3, /*!< Process catalog zone changes. */ +} reload_t; + +/*! + * \brief Server interface structure. + */ +typedef struct { + int *fd_udp; + unsigned fd_udp_count; + int *fd_tcp; + unsigned fd_tcp_count; + int *fd_xdp; + unsigned fd_xdp_count; + unsigned xdp_first_thread_id; + struct knot_xdp_socket **xdp_sockets; + struct sockaddr_storage addr; +} iface_t; + +/*! + * \brief Handler indexes. + */ +enum { + IO_UDP = 0, + IO_TCP = 1, + IO_XDP = 2, +}; + +/*! + * \brief Main server structure. + * + * Keeps references to all important structures needed for operation. + */ +typedef struct server { + /*! \brief Server state tracking. */ + volatile unsigned state; + + knot_zonedb_t *zone_db; + knot_lmdb_db_t timerdb; + knot_lmdb_db_t journaldb; + knot_lmdb_db_t kaspdb; + catalog_t catalog; + + /*! \brief I/O handlers. */ + struct { + unsigned size; + iohandler_t handler; + } handlers[3]; + + /*! \brief Background jobs. */ + worker_pool_t *workers; + + /*! \brief Event scheduler. */ + evsched_t sched; + + /*! \brief List of interfaces. */ + iface_t *ifaces; + size_t n_ifaces; + + /*! \brief Pending changes to catalog member zones, update indication. */ + catalog_update_t catalog_upd; + atomic_bool catalog_upd_signal; + + /*! \brief Context of pending zones' backup. */ + zone_backup_ctxs_t backup_ctxs; + + /*! \brief Crendentials context for QUIC. */ + struct knot_quic_creds *quic_creds; +} server_t; + +/*! + * \brief Initializes the server structure. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + */ +int server_init(server_t *server, int bg_workers); + +/*! + * \brief Properly destroys the server structure. + * + * \param server Server structure to be used for operation. + */ +void server_deinit(server_t *server); + +/*! + * \brief Starts the server. + * + * \param server Server structure to be used for operation. + * \param async Don't wait for zones to load if true. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL on invalid parameters. + * + */ +int server_start(server_t *server, bool async); + +/*! + * \brief Waits for the server to finish. + * + * \param server Server structure to be used for operation. + * + */ +void server_wait(server_t *server); + +/*! + * \brief Reload server configuration. + * + * \param server Server instance. + * \param mode Reload mode. + * + * \return Error code, KNOT_EOK if success. + */ +int server_reload(server_t *server, reload_t mode); + +/*! + * \brief Requests server to stop. + * + * \param server Server structure to be used for operation. + */ +void server_stop(server_t *server); + +/*! + * \brief Server reconfiguration routine. + * + * Routine for dynamic server reconfiguration. + * + * \param conf Configuration. + * \param server Server instance. + * + * \return Error code, KNOT_EOK if success. + */ +int server_reconfigure(conf_t *conf, server_t *server); + +/*! + * \brief Reconfigure zone database. + * + * Routine for dynamic server zones reconfiguration. + * + * \param conf Configuration. + * \param server Server instance. + * \param mode Reload mode. + */ +void server_update_zones(conf_t *conf, server_t *server, reload_t mode); diff --git a/src/knot/server/tcp-handler.c b/src/knot/server/tcp-handler.c new file mode 100644 index 0000000..433ca9b --- /dev/null +++ b/src/knot/server/tcp-handler.c @@ -0,0 +1,380 @@ +/* Copyright (C) 2023 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <urcu.h> +#ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD) +#include <sys/uio.h> +#endif // HAVE_SYS_UIO_H + +#include "knot/server/server.h" +#include "knot/server/tcp-handler.h" +#include "knot/common/log.h" +#include "knot/common/fdset.h" +#include "knot/nameserver/process_query.h" +#include "knot/query/layer.h" +#include "contrib/macros.h" +#include "contrib/mempattern.h" +#include "contrib/net.h" +#include "contrib/openbsd/strlcpy.h" +#include "contrib/sockaddr.h" +#include "contrib/time.h" +#include "contrib/ucw/mempool.h" + +/*! \brief TCP context data. */ +typedef struct tcp_context { + knot_layer_t layer; /*!< Query processing layer. */ + server_t *server; /*!< Name server structure. */ + struct iovec iov[2]; /*!< TX/RX buffers. */ + unsigned client_threshold; /*!< Index of first TCP client. */ + struct timespec last_poll_time; /*!< Time of the last socket poll. */ + bool is_throttled; /*!< TCP connections throttling switch. */ + fdset_t set; /*!< Set of server/client sockets. */ + unsigned thread_id; /*!< Thread identifier. */ + unsigned max_worker_fds; /*!< Max TCP clients per worker configuration + no. of ifaces. */ + int idle_timeout; /*!< [s] TCP idle timeout configuration. */ + int io_timeout; /*!< [ms] TCP send/recv timeout configuration. */ +} tcp_context_t; + +#define TCP_SWEEP_INTERVAL 2 /*!< [secs] granularity of connection sweeping. */ + +static void update_sweep_timer(struct timespec *timer) +{ + *timer = time_now(); + timer->tv_sec += TCP_SWEEP_INTERVAL; +} + +static void update_tcp_conf(tcp_context_t *tcp) +{ + rcu_read_lock(); + conf_t *pconf = conf(); + tcp->max_worker_fds = tcp->client_threshold + \ + MAX(pconf->cache.srv_tcp_max_clients / pconf->cache.srv_tcp_threads, 1); + tcp->idle_timeout = pconf->cache.srv_tcp_idle_timeout; + tcp->io_timeout = pconf->cache.srv_tcp_io_timeout; + rcu_read_unlock(); +} + +/*! \brief Sweep TCP connection. */ +static fdset_sweep_state_t tcp_sweep(fdset_t *set, int fd, _unused_ void *data) +{ + assert(set && fd >= 0); + + /* Best-effort, name and shame. */ + struct sockaddr_storage ss = { 0 }; + socklen_t len = sizeof(struct sockaddr_storage); + if (getpeername(fd, (struct sockaddr *)&ss, &len) == 0) { + char addr_str[SOCKADDR_STRLEN]; + sockaddr_tostr(addr_str, sizeof(addr_str), &ss); + log_notice("TCP, terminated inactive client, address %s", addr_str); + } + + return FDSET_SWEEP; +} + +static bool tcp_active_state(int state) +{ + return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL); +} + +static bool tcp_send_state(int state) +{ + return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP); +} + +static void tcp_log_error(struct sockaddr_storage *ss, const char *operation, int ret) +{ + /* Don't log ECONN as it usually means client closed the connection. */ + if (ret == KNOT_ETIMEOUT) { + char addr_str[SOCKADDR_STRLEN]; + sockaddr_tostr(addr_str, sizeof(addr_str), ss); + log_debug("TCP, failed to %s due to IO timeout, closing connection, address %s", + operation, addr_str); + } +} + +static unsigned tcp_set_ifaces(const iface_t *ifaces, size_t n_ifaces, + fdset_t *fds, int thread_id) +{ + if (n_ifaces == 0) { + return 0; + } + + for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) { + if (i->fd_tcp_count == 0) { // Ignore XDP interface. + assert(i->fd_xdp_count > 0); + continue; + } + + int tcp_id = 0; +#ifdef ENABLE_REUSEPORT + if (conf()->cache.srv_tcp_reuseport) { + /* Note: thread_ids start with UDP threads, TCP threads follow. */ + assert((i->fd_udp_count <= thread_id) && + (thread_id < i->fd_tcp_count + i->fd_udp_count)); + + tcp_id = thread_id - i->fd_udp_count; + } +#endif + int ret = fdset_add(fds, i->fd_tcp[tcp_id], FDSET_POLLIN, NULL); + if (ret < 0) { + return 0; + } + } + + return fdset_get_length(fds); +} + +static int tcp_handle(tcp_context_t *tcp, int fd, struct iovec *rx, struct iovec *tx) +{ + /* Get peer name. */ + struct sockaddr_storage ss; + socklen_t addrlen = sizeof(struct sockaddr_storage); + if (getpeername(fd, (struct sockaddr *)&ss, &addrlen) != 0) { + return KNOT_EADDRNOTAVAIL; + } + + /* Create query processing parameter. */ + knotd_qdata_params_t params = { + .proto = KNOTD_QUERY_PROTO_TCP, + .remote = &ss, + .socket = fd, + .server = tcp->server, + .thread_id = tcp->thread_id + }; + + rx->iov_len = KNOT_WIRE_MAX_PKTSIZE; + tx->iov_len = KNOT_WIRE_MAX_PKTSIZE; + + /* Receive data. */ + int recv = net_dns_tcp_recv(fd, rx->iov_base, rx->iov_len, tcp->io_timeout); + if (recv > 0) { + rx->iov_len = recv; + } else { + tcp_log_error(&ss, "receive", recv); + return KNOT_EOF; + } + + /* Initialize processing layer. */ + knot_layer_begin(&tcp->layer, ¶ms); + + /* Create packets. */ + knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, tcp->layer.mm); + knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, tcp->layer.mm); + + /* Input packet. */ + int ret = knot_pkt_parse(query, 0); + if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT) + query->parsed--; // artificially decreasing "parsed" leads to FORMERR + } + knot_layer_consume(&tcp->layer, query); + + /* Resolve until NOOP or finished. */ + while (tcp_active_state(tcp->layer.state)) { + knot_layer_produce(&tcp->layer, ans); + /* Send, if response generation passed and wasn't ignored. */ + if (ans->size > 0 && tcp_send_state(tcp->layer.state)) { + int sent = net_dns_tcp_send(fd, ans->wire, ans->size, + tcp->io_timeout, NULL); + if (sent != ans->size) { + tcp_log_error(&ss, "send", sent); + ret = KNOT_EOF; + break; + } + } + } + + /* Reset after processing. */ + knot_layer_finish(&tcp->layer); + + /* Flush per-query memory (including query and answer packets). */ + mp_flush(tcp->layer.mm->ctx); + + return ret; +} + +static void tcp_event_accept(tcp_context_t *tcp, unsigned i) +{ + /* Accept client. */ + int fd = fdset_get_fd(&tcp->set, i); + int client = net_accept(fd, NULL); + if (client >= 0) { + /* Assign to fdset. */ + int idx = fdset_add(&tcp->set, client, FDSET_POLLIN, NULL); + if (idx < 0) { + close(client); + return; + } + + /* Update watchdog timer. */ + (void)fdset_set_watchdog(&tcp->set, idx, tcp->idle_timeout); + } +} + +static int tcp_event_serve(tcp_context_t *tcp, unsigned i) +{ + int ret = tcp_handle(tcp, fdset_get_fd(&tcp->set, i), + &tcp->iov[0], &tcp->iov[1]); + if (ret == KNOT_EOK) { + /* Update socket activity timer. */ + (void)fdset_set_watchdog(&tcp->set, i, tcp->idle_timeout); + } + + return ret; +} + +static void tcp_wait_for_events(tcp_context_t *tcp) +{ + fdset_t *set = &tcp->set; + + /* Check if throttled with many open TCP connections. */ + assert(fdset_get_length(set) <= tcp->max_worker_fds); + tcp->is_throttled = fdset_get_length(set) == tcp->max_worker_fds; + + /* If throttled, temporarily ignore new TCP connections. */ + unsigned offset = tcp->is_throttled ? tcp->client_threshold : 0; + + /* Wait for events. */ + fdset_it_t it; + (void)fdset_poll(set, &it, offset, TCP_SWEEP_INTERVAL * 1000); + + /* Mark the time of last poll call. */ + tcp->last_poll_time = time_now(); + + /* Process events. */ + for (; !fdset_it_is_done(&it); fdset_it_next(&it)) { + bool should_close = false; + unsigned int idx = fdset_it_get_idx(&it); + if (fdset_it_is_error(&it)) { + should_close = (idx >= tcp->client_threshold); + } else if (fdset_it_is_pollin(&it)) { + /* Master sockets - new connection to accept. */ + if (idx < tcp->client_threshold) { + /* Don't accept more clients than configured. */ + if (fdset_get_length(set) < tcp->max_worker_fds) { + tcp_event_accept(tcp, idx); + } + /* Client sockets - already accepted connection or + closed connection :-( */ + } else if (tcp_event_serve(tcp, idx) != KNOT_EOK) { + should_close = true; + } + } + + /* Evaluate. */ + if (should_close) { + fdset_it_remove(&it); + } + } + fdset_it_commit(&it); +} + +int tcp_master(dthread_t *thread) +{ + if (thread == NULL || thread->data == NULL) { + return KNOT_EINVAL; + } + + iohandler_t *handler = (iohandler_t *)thread->data; + int thread_id = handler->thread_id[dt_get_id(thread)]; + +#ifdef ENABLE_REUSEPORT + /* Set thread affinity to CPU core (overlaps with UDP/XDP). */ + if (conf()->cache.srv_tcp_reuseport) { + unsigned cpu = dt_online_cpus(); + if (cpu > 1) { + unsigned cpu_mask = (dt_get_id(thread) % cpu); + dt_setaffinity(thread, &cpu_mask, 1); + } + } +#endif + + int ret = KNOT_EOK; + + /* Create big enough memory cushion. */ + knot_mm_t mm; + mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE); + + /* Create TCP answering context. */ + tcp_context_t tcp = { + .server = handler->server, + .is_throttled = false, + .thread_id = thread_id, + }; + knot_layer_init(&tcp.layer, &mm, process_query_layer()); + + /* Create iovec abstraction. */ + for (unsigned i = 0; i < 2; ++i) { + tcp.iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE; + tcp.iov[i].iov_base = malloc(tcp.iov[i].iov_len); + if (tcp.iov[i].iov_base == NULL) { + ret = KNOT_ENOMEM; + goto finish; + } + } + + /* Initialize sweep interval and TCP configuration. */ + struct timespec next_sweep; + update_sweep_timer(&next_sweep); + update_tcp_conf(&tcp); + + /* Prepare initial buffer for listening and bound sockets. */ + if (fdset_init(&tcp.set, FDSET_RESIZE_STEP) != KNOT_EOK) { + goto finish; + } + + /* Set descriptors for the configured interfaces. */ + tcp.client_threshold = tcp_set_ifaces(handler->server->ifaces, + handler->server->n_ifaces, + &tcp.set, thread_id); + if (tcp.client_threshold == 0) { + goto finish; /* Terminate on zero interfaces. */ + } + + for (;;) { + /* Check for cancellation. */ + if (dt_is_cancelled(thread)) { + break; + } + + /* Serve client requests. */ + tcp_wait_for_events(&tcp); + + /* Sweep inactive clients and refresh TCP configuration. */ + if (tcp.last_poll_time.tv_sec >= next_sweep.tv_sec) { + fdset_sweep(&tcp.set, &tcp_sweep, NULL); + update_sweep_timer(&next_sweep); + update_tcp_conf(&tcp); + } + } + +finish: + free(tcp.iov[0].iov_base); + free(tcp.iov[1].iov_base); + mp_delete(mm.ctx); + fdset_clear(&tcp.set); + + return ret; +} diff --git a/src/knot/server/tcp-handler.h b/src/knot/server/tcp-handler.h new file mode 100644 index 0000000..b60ce8f --- /dev/null +++ b/src/knot/server/tcp-handler.h @@ -0,0 +1,43 @@ +/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +/*! + * \brief TCP sockets threading model. + * + * The master socket distributes incoming connections among + * the worker threads ("buckets"). Each threads processes it's own + * set of sockets, and eliminates mutual exclusion problem by doing so. + */ + +#pragma once + +#include "knot/server/dthreads.h" + +#define TCP_BACKLOG_SIZE 10 /*!< TCP listen backlog size. */ + +/*! + * \brief TCP handler thread runnable. + * + * Listens to both bound TCP sockets for client connections and + * serves TCP clients. This runnable is designed to be used as coherent + * and implements cancellation point. + * + * \param thread Associated thread from DThreads unit. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL invalid parameters. + */ +int tcp_master(dthread_t *thread); diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c new file mode 100644 index 0000000..1e309d6 --- /dev/null +++ b/src/knot/server/udp-handler.c @@ -0,0 +1,575 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#define __APPLE_USE_RFC_3542 + +#include <assert.h> +#include <dlfcn.h> +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/param.h> +#ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD) +#include <sys/uio.h> +#endif /* HAVE_SYS_UIO_H */ +#include <unistd.h> + +#include "contrib/macros.h" +#include "contrib/mempattern.h" +#include "contrib/sockaddr.h" +#include "contrib/ucw/mempool.h" +#include "knot/common/fdset.h" +#include "knot/nameserver/process_query.h" +#include "knot/query/layer.h" +#include "knot/server/proxyv2.h" +#include "knot/server/server.h" +#include "knot/server/udp-handler.h" +#include "knot/server/xdp-handler.h" + +/* Buffer identifiers. */ +enum { + RX = 0, + TX = 1, + NBUFS = 2 +}; + +/*! \brief UDP context data. */ +typedef struct { + knot_layer_t layer; /*!< Query processing layer. */ + server_t *server; /*!< Name server structure. */ + unsigned thread_id; /*!< Thread identifier. */ +} udp_context_t; + +static bool udp_state_active(int state) +{ + return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL); +} + +static void udp_handle(udp_context_t *udp, int fd, struct sockaddr_storage *ss, + struct iovec *rx, struct iovec *tx, struct knot_xdp_msg *xdp_msg) +{ + /* Create query processing parameter. */ + knotd_qdata_params_t params = { + .proto = KNOTD_QUERY_PROTO_UDP, + .remote = ss, + .socket = fd, + .server = udp->server, + .xdp_msg = xdp_msg, + .thread_id = udp->thread_id + }; + struct sockaddr_storage proxied_remote; + + /* Start query processing. */ + knot_layer_begin(&udp->layer, ¶ms); + + /* Create packets. */ + knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, udp->layer.mm); + knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, udp->layer.mm); + + /* Input packet. */ + int ret = knot_pkt_parse(query, 0); + if (ret != KNOT_EOK && query->parsed > 0) { + ret = proxyv2_header_strip(&query, params.remote, &proxied_remote); + if (ret == KNOT_EOK) { + params.remote = &proxied_remote; + } else { + query->parsed--; // artificially decreasing "parsed" leads to FORMERR + } + } + knot_layer_consume(&udp->layer, query); + + /* Process answer. */ + while (udp_state_active(udp->layer.state)) { + knot_layer_produce(&udp->layer, ans); + } + + /* Send response only if finished successfully. */ + if (udp->layer.state == KNOT_STATE_DONE) { + tx->iov_len = ans->size; + } else { + tx->iov_len = 0; + } + + /* Reset after processing. */ + knot_layer_finish(&udp->layer); + + /* Flush per-query memory (including query and answer packets). */ + mp_flush(udp->layer.mm->ctx); +} + +typedef struct { + void* (*udp_init)(udp_context_t *, void *); + void (*udp_deinit)(void *); + int (*udp_recv)(int, void *); + void (*udp_handle)(udp_context_t *, void *); + void (*udp_send)(void *); + void (*udp_sweep)(void *); // Optional +} udp_api_t; + +/*! \brief Control message to fit IP_PKTINFO or IPv6_RECVPKTINFO. */ +typedef union { + struct cmsghdr cmsg; + uint8_t buf[CMSG_SPACE(sizeof(struct in6_pktinfo))]; +} cmsg_pktinfo_t; + +static void udp_pktinfo_handle(const struct msghdr *rx, struct msghdr *tx) +{ + tx->msg_controllen = rx->msg_controllen; + if (tx->msg_controllen > 0) { + tx->msg_control = rx->msg_control; + } else { + // BSD has problem with zero length and not-null pointer + tx->msg_control = NULL; + } + +#if defined(__linux__) || defined(__APPLE__) + struct cmsghdr *cmsg = CMSG_FIRSTHDR(tx); + if (cmsg == NULL) { + return; + } + + /* Unset the ifindex to not bypass the routing tables. */ + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { + struct in_pktinfo *info = (struct in_pktinfo *)CMSG_DATA(cmsg); + info->ipi_spec_dst = info->ipi_addr; + info->ipi_ifindex = 0; + } else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) { + struct in6_pktinfo *info = (struct in6_pktinfo *)CMSG_DATA(cmsg); + info->ipi6_ifindex = 0; + } +#endif +} + +/* UDP recvfrom() request struct. */ +struct udp_recvfrom { + int fd; + struct sockaddr_storage addr; + struct msghdr msg[NBUFS]; + struct iovec iov[NBUFS]; + uint8_t buf[NBUFS][KNOT_WIRE_MAX_PKTSIZE]; + cmsg_pktinfo_t pktinfo; +}; + +static void *udp_recvfrom_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock) +{ + struct udp_recvfrom *rq = malloc(sizeof(struct udp_recvfrom)); + if (rq == NULL) { + return NULL; + } + memset(rq, 0, sizeof(struct udp_recvfrom)); + + for (unsigned i = 0; i < NBUFS; ++i) { + rq->iov[i].iov_base = rq->buf + i; + rq->iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msg[i].msg_name = &rq->addr; + rq->msg[i].msg_namelen = sizeof(rq->addr); + rq->msg[i].msg_iov = &rq->iov[i]; + rq->msg[i].msg_iovlen = 1; + rq->msg[i].msg_control = &rq->pktinfo.cmsg; + rq->msg[i].msg_controllen = sizeof(rq->pktinfo); + } + return rq; +} + +static void udp_recvfrom_deinit(void *d) +{ + struct udp_recvfrom *rq = d; + free(rq); +} + +static int udp_recvfrom_recv(int fd, void *d) +{ + /* Reset max lengths. */ + struct udp_recvfrom *rq = (struct udp_recvfrom *)d; + rq->iov[RX].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msg[RX].msg_namelen = sizeof(struct sockaddr_storage); + rq->msg[RX].msg_controllen = sizeof(rq->pktinfo); + + int ret = recvmsg(fd, &rq->msg[RX], MSG_DONTWAIT); + if (ret > 0) { + rq->fd = fd; + rq->iov[RX].iov_len = ret; + return 1; + } + + return 0; +} + +static void udp_recvfrom_handle(udp_context_t *ctx, void *d) +{ + struct udp_recvfrom *rq = d; + + /* Prepare TX address. */ + rq->msg[TX].msg_namelen = rq->msg[RX].msg_namelen; + rq->iov[TX].iov_len = KNOT_WIRE_MAX_PKTSIZE; + + udp_pktinfo_handle(&rq->msg[RX], &rq->msg[TX]); + + /* Process received pkt. */ + udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX], NULL); +} + +static void udp_recvfrom_send(void *d) +{ + struct udp_recvfrom *rq = d; + if (rq->iov[TX].iov_len > 0) { + (void)sendmsg(rq->fd, &rq->msg[TX], 0); + } +} + +_unused_ +static udp_api_t udp_recvfrom_api = { + udp_recvfrom_init, + udp_recvfrom_deinit, + udp_recvfrom_recv, + udp_recvfrom_handle, + udp_recvfrom_send, +}; + +#ifdef ENABLE_RECVMMSG +/* UDP recvmmsg() request struct. */ +struct udp_recvmmsg { + int fd; + struct sockaddr_storage addrs[RECVMMSG_BATCHLEN]; + char *iobuf[NBUFS]; + struct iovec *iov[NBUFS]; + struct mmsghdr *msgs[NBUFS]; + unsigned rcvd; + knot_mm_t mm; + cmsg_pktinfo_t pktinfo[RECVMMSG_BATCHLEN]; +}; + +static void *udp_recvmmsg_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock) +{ + knot_mm_t mm; + mm_ctx_mempool(&mm, sizeof(struct udp_recvmmsg)); + + struct udp_recvmmsg *rq = mm_alloc(&mm, sizeof(struct udp_recvmmsg)); + memset(rq, 0, sizeof(*rq)); + memcpy(&rq->mm, &mm, sizeof(knot_mm_t)); + + /* Initialize buffers. */ + for (unsigned i = 0; i < NBUFS; ++i) { + rq->iobuf[i] = mm_alloc(&mm, KNOT_WIRE_MAX_PKTSIZE * RECVMMSG_BATCHLEN); + rq->iov[i] = mm_alloc(&mm, sizeof(struct iovec) * RECVMMSG_BATCHLEN); + rq->msgs[i] = mm_alloc(&mm, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN); + memset(rq->msgs[i], 0, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN); + for (unsigned k = 0; k < RECVMMSG_BATCHLEN; ++k) { + rq->iov[i][k].iov_base = rq->iobuf[i] + k * KNOT_WIRE_MAX_PKTSIZE; + rq->iov[i][k].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msgs[i][k].msg_hdr.msg_iov = rq->iov[i] + k; + rq->msgs[i][k].msg_hdr.msg_iovlen = 1; + rq->msgs[i][k].msg_hdr.msg_name = rq->addrs + k; + rq->msgs[i][k].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + rq->msgs[i][k].msg_hdr.msg_control = &rq->pktinfo[k].cmsg; + rq->msgs[i][k].msg_hdr.msg_controllen = sizeof(cmsg_pktinfo_t); + } + } + + return rq; +} + +static void udp_recvmmsg_deinit(void *d) +{ + struct udp_recvmmsg *rq = d; + if (rq != NULL) { + mp_delete(rq->mm.ctx); + } +} + +static int udp_recvmmsg_recv(int fd, void *d) +{ + struct udp_recvmmsg *rq = d; + + int n = recvmmsg(fd, rq->msgs[RX], RECVMMSG_BATCHLEN, MSG_DONTWAIT, NULL); + if (n > 0) { + rq->fd = fd; + rq->rcvd = n; + } + return n; +} + +static void udp_recvmmsg_handle(udp_context_t *ctx, void *d) +{ + struct udp_recvmmsg *rq = d; + + /* Handle each received message. */ + unsigned j = 0; + for (unsigned i = 0; i < rq->rcvd; ++i) { + struct msghdr *rx = &rq->msgs[RX][i].msg_hdr; + struct msghdr *tx = &rq->msgs[TX][j].msg_hdr; + + /* Set received bytes. */ + rx->msg_iov->iov_len = rq->msgs[RX][i].msg_len; + /* Update mapping of address buffer. */ + tx->msg_name = rx->msg_name; + tx->msg_namelen = rx->msg_namelen; + + /* Update output message control buffer. */ + udp_pktinfo_handle(rx, tx); + + udp_handle(ctx, rq->fd, rq->addrs + i, rx->msg_iov, tx->msg_iov, NULL); + + if (tx->msg_iov->iov_len > 0) { + rq->msgs[TX][j].msg_len = tx->msg_iov->iov_len; + j++; + } else { + /* Reset tainted output context. */ + tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + } + + /* Reset input context. */ + rx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + rx->msg_namelen = sizeof(struct sockaddr_storage); + rx->msg_controllen = sizeof(cmsg_pktinfo_t); + } + rq->rcvd = j; +} + +static void udp_recvmmsg_send(void *d) +{ + struct udp_recvmmsg *rq = d; + + (void)sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0); + for (unsigned i = 0; i < rq->rcvd; ++i) { + struct msghdr *tx = &rq->msgs[TX][i].msg_hdr; + + /* Reset output context. */ + tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + } +} + +static udp_api_t udp_recvmmsg_api = { + udp_recvmmsg_init, + udp_recvmmsg_deinit, + udp_recvmmsg_recv, + udp_recvmmsg_handle, + udp_recvmmsg_send, +}; +#endif /* ENABLE_RECVMMSG */ + +#ifdef ENABLE_XDP + +static void *xdp_recvmmsg_init(udp_context_t *ctx, void *xdp_sock) +{ + return xdp_handle_init(ctx->server, xdp_sock); +} + +static void xdp_recvmmsg_deinit(void *d) +{ + if (d != NULL) { + xdp_handle_free(d); + } +} + +static int xdp_recvmmsg_recv(_unused_ int fd, void *d) +{ + return xdp_handle_recv(d); +} + +static void xdp_recvmmsg_handle(udp_context_t *ctx, void *d) +{ + xdp_handle_msgs(d, &ctx->layer, ctx->server, ctx->thread_id); +} + +static void xdp_recvmmsg_send(void *d) +{ + xdp_handle_send(d); +} + +static void xdp_recvmmsg_sweep(void *d) +{ + xdp_handle_reconfigure(d); + xdp_handle_sweep(d); +} + +static udp_api_t xdp_recvmmsg_api = { + xdp_recvmmsg_init, + xdp_recvmmsg_deinit, + xdp_recvmmsg_recv, + xdp_recvmmsg_handle, + xdp_recvmmsg_send, + xdp_recvmmsg_sweep, +}; +#endif /* ENABLE_XDP */ + +static bool is_xdp_thread(const server_t *server, int thread_id) +{ + return server->handlers[IO_XDP].size > 0 && + server->handlers[IO_XDP].handler.thread_id[0] <= thread_id; +} + +static int iface_udp_fd(const iface_t *iface, int thread_id, bool xdp_thread, + void **xdp_socket) +{ + if (xdp_thread) { +#ifdef ENABLE_XDP + if (thread_id < iface->xdp_first_thread_id || + thread_id >= iface->xdp_first_thread_id + iface->fd_xdp_count) { + return -1; // Different XDP interface. + } + size_t xdp_wrk_id = thread_id - iface->xdp_first_thread_id; + assert(xdp_wrk_id < iface->fd_xdp_count); + *xdp_socket = iface->xdp_sockets[xdp_wrk_id]; + return iface->fd_xdp[xdp_wrk_id]; +#else + assert(0); + return -1; +#endif + } else { // UDP thread. + if (iface->fd_udp_count == 0) { // No UDP interfaces. + assert(iface->fd_xdp_count > 0); + return -1; + } +#ifdef ENABLE_REUSEPORT + assert(thread_id < iface->fd_udp_count); + return iface->fd_udp[thread_id]; +#else + return iface->fd_udp[0]; +#endif + } +} + +static unsigned udp_set_ifaces(const server_t *server, size_t n_ifaces, fdset_t *fds, + int thread_id, void **xdp_socket) +{ + if (n_ifaces == 0) { + return 0; + } + + bool xdp_thread = is_xdp_thread(server, thread_id); + const iface_t *ifaces = server->ifaces; + + for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) { + int fd = iface_udp_fd(i, thread_id, xdp_thread, xdp_socket); + if (fd < 0) { + continue; + } + int ret = fdset_add(fds, fd, FDSET_POLLIN, NULL); + if (ret < 0) { + return 0; + } + } + + assert(!xdp_thread || fdset_get_length(fds) == 1); + return fdset_get_length(fds); +} + +int udp_master(dthread_t *thread) +{ + if (thread == NULL || thread->data == NULL) { + return KNOT_EINVAL; + } + + iohandler_t *handler = (iohandler_t *)thread->data; + int thread_id = handler->thread_id[dt_get_id(thread)]; + + if (handler->server->n_ifaces == 0) { + return KNOT_EOK; + } + + /* Set thread affinity to CPU core (same for UDP and XDP). */ + unsigned cpu = dt_online_cpus(); + if (cpu > 1) { + unsigned cpu_mask = (dt_get_id(thread) % cpu); + dt_setaffinity(thread, &cpu_mask, 1); + } + + /* Choose processing API. */ + udp_api_t *api = NULL; + if (is_xdp_thread(handler->server, thread_id)) { +#ifdef ENABLE_XDP + api = &xdp_recvmmsg_api; +#else + assert(0); +#endif + } else { +#ifdef ENABLE_RECVMMSG + api = &udp_recvmmsg_api; +#else + api = &udp_recvfrom_api; +#endif + } + void *api_ctx = NULL; + + /* Create big enough memory cushion. */ + knot_mm_t mm; + mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE); + + /* Create UDP answering context. */ + udp_context_t udp = { + .server = handler->server, + .thread_id = thread_id, + }; + knot_layer_init(&udp.layer, &mm, process_query_layer()); + + /* Allocate descriptors for the configured interfaces. */ + void *xdp_socket = NULL; + size_t nifs = handler->server->n_ifaces; + fdset_t fds; + if (fdset_init(&fds, nifs) != KNOT_EOK) { + goto finish; + } + unsigned nfds = udp_set_ifaces(handler->server, nifs, &fds, + thread_id, &xdp_socket); + if (nfds == 0) { + goto finish; + } + + /* Initialize the networking API. */ + api_ctx = api->udp_init(&udp, xdp_socket); + if (api_ctx == NULL) { + goto finish; + } + + /* Loop until all data is read. */ + for (;;) { + /* Cancellation point. */ + if (dt_is_cancelled(thread)) { + break; + } + + /* Wait for events. */ + fdset_it_t it; + (void)fdset_poll(&fds, &it, 0, 1000); + + /* Process the events. */ + for (; !fdset_it_is_done(&it); fdset_it_next(&it)) { + if (!fdset_it_is_pollin(&it)) { + continue; + } + if (api->udp_recv(fdset_it_get_fd(&it), api_ctx) > 0) { + api->udp_handle(&udp, api_ctx); + api->udp_send(api_ctx); + } + } + + /* Regular maintenance (XDP-TCP only). */ + if (api->udp_sweep != NULL) { + api->udp_sweep(api_ctx); + } + } + +finish: + api->udp_deinit(api_ctx); + mp_delete(mm.ctx); + fdset_clear(&fds); + + return KNOT_EOK; +} diff --git a/src/knot/server/udp-handler.h b/src/knot/server/udp-handler.h new file mode 100644 index 0000000..b09e43e --- /dev/null +++ b/src/knot/server/udp-handler.h @@ -0,0 +1,43 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +/*! + * \brief UDP sockets threading model. + * + * The master socket locks one worker thread at a time + * and saves events in it's own backing store for asynchronous processing. + * The worker threads work asynchronously in thread pool. + */ + +#pragma once + +#include "knot/server/dthreads.h" + +#define RECVMMSG_BATCHLEN 10 /*!< Default recvmmsg() batch size. */ + +/*! + * \brief UDP handler thread runnable. + * + * Listen to DNS datagrams in a loop on a UDP socket and + * reply to them. This runnable is designed to be used as coherent + * and implements cancellation point. + * + * \param thread Associated thread from DThreads unit. + * + * \retval KNOT_EOK on success. + * \retval KNOT_EINVAL invalid parameters. + */ +int udp_master(dthread_t *thread); diff --git a/src/knot/server/xdp-handler.c b/src/knot/server/xdp-handler.c new file mode 100644 index 0000000..3c9f6d6 --- /dev/null +++ b/src/knot/server/xdp-handler.c @@ -0,0 +1,506 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifdef ENABLE_XDP + +#include <assert.h> +#include <stdlib.h> +#include <urcu.h> + +#include "knot/server/xdp-handler.h" +#include "knot/common/log.h" +#include "knot/server/proxyv2.h" +#include "knot/server/server.h" +#include "contrib/sockaddr.h" +#include "contrib/time.h" +#include "contrib/ucw/mempool.h" +#include "libknot/endian.h" +#include "libknot/error.h" +#ifdef ENABLE_QUIC +#include "libknot/xdp/quic.h" +#endif // ENABLE_QUIC +#include "libknot/xdp/tcp.h" +#include "libknot/xdp/tcp_iobuf.h" + +#define QUIC_MAX_SEND_PER_RECV 4 +#define QUIC_IBUFS_PER_CONN 512 /* Heuristic value: this means that e.g. for 100k allowed + QUIC conns, we will limit total size of input buffers to 50 MiB. */ + +typedef struct { + uint64_t last_log; + knot_sweep_stats_t stats; +} closed_log_ctx_t; + +typedef struct xdp_handle_ctx { + knot_xdp_socket_t *sock; + knot_xdp_msg_t msg_recv[XDP_BATCHLEN]; + knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN]; + knot_tcp_relay_t relays[XDP_BATCHLEN]; + uint32_t msg_recv_count; + uint32_t msg_udp_count; + knot_tcp_table_t *tcp_table; + knot_tcp_table_t *syn_table; + +#ifdef ENABLE_QUIC + knot_xquic_conn_t *quic_relays[XDP_BATCHLEN]; + int quic_rets[XDP_BATCHLEN]; + knot_xquic_table_t *quic_table; + closed_log_ctx_t quic_closed; +#endif // ENABLE_QUIC + + bool tcp; + size_t tcp_max_conns; + size_t tcp_syn_conns; + size_t tcp_max_inbufs; + size_t tcp_max_obufs; + uint32_t tcp_idle_close; // In microseconds. + uint32_t tcp_idle_reset; // In microseconds. + uint32_t tcp_idle_resend; // In microseconds. + + uint16_t quic_port; // Network-byte order! + size_t quic_max_conns; + uint64_t quic_idle_close; // In nanoseconds. + size_t quic_max_inbufs; + size_t quic_max_obufs; + + closed_log_ctx_t tcp_closed; +} xdp_handle_ctx_t; + +static bool udp_state_active(int state) +{ + return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL); +} + +static bool tcp_active_state(int state) +{ + return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL); +} + +static bool tcp_send_state(int state) +{ + return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP); +} + +static void log_closed(closed_log_ctx_t *ctx, bool tcp) +{ + struct timespec now = time_now(); + uint64_t sec = now.tv_sec + now.tv_nsec / 1000000000; + if (sec - ctx->last_log <= 9 || (ctx->stats.total == 0)) { + return; + } + + const char *proto = tcp ? "TCP" : "QUIC"; + + uint32_t timedout = ctx->stats.counters[KNOT_SWEEP_CTR_TIMEOUT]; + uint32_t limit_conn = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_CONN]; + uint32_t limit_ibuf = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_IBUF]; + uint32_t limit_obuf = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_OBUF]; + + if (tcp || ctx->stats.total != timedout) { + log_notice("%s, connection sweep, closed %u, count limit %u, inbuf limit %u, outbuf limit %u", + proto, timedout, limit_conn, limit_ibuf, limit_obuf); + } else { + log_debug("%s, timed out connections %u", proto, timedout); + } + + ctx->last_log = sec; + knot_sweep_stats_reset(&ctx->stats); +} + +void xdp_handle_reconfigure(xdp_handle_ctx_t *ctx) +{ + rcu_read_lock(); + conf_t *pconf = conf(); + ctx->tcp = pconf->cache.xdp_tcp; + ctx->quic_port = htobe16(pconf->cache.xdp_quic); + ctx->tcp_max_conns = pconf->cache.xdp_tcp_max_clients / pconf->cache.srv_xdp_threads; + ctx->tcp_syn_conns = 2 * ctx->tcp_max_conns; + ctx->tcp_max_inbufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads; + ctx->tcp_max_obufs = pconf->cache.xdp_tcp_outbuf_max_size / pconf->cache.srv_xdp_threads; + ctx->tcp_idle_close = pconf->cache.xdp_tcp_idle_close * 1000000; + ctx->tcp_idle_reset = pconf->cache.xdp_tcp_idle_reset * 1000000; + ctx->tcp_idle_resend= pconf->cache.xdp_tcp_idle_resend * 1000000; + ctx->quic_max_conns = pconf->cache.srv_quic_max_clients / pconf->cache.srv_xdp_threads; + ctx->quic_idle_close= pconf->cache.srv_quic_idle_close * 1000000000LU; + ctx->quic_max_inbufs= ctx->quic_max_conns * QUIC_IBUFS_PER_CONN; + ctx->quic_max_obufs = pconf->cache.srv_quic_obuf_max_size; + rcu_read_unlock(); +} + +void xdp_handle_free(xdp_handle_ctx_t *ctx) +{ + knot_tcp_table_free(ctx->tcp_table); + knot_tcp_table_free(ctx->syn_table); +#ifdef ENABLE_QUIC + knot_xquic_table_free(ctx->quic_table); +#endif // ENABLE_QUIC + free(ctx); +} + +#ifdef ENABLE_QUIC +static void quic_log_cb(const char *line) +{ + log_debug("QUIC: %s", line); +} +#endif // ENABLE_QUIC + +xdp_handle_ctx_t *xdp_handle_init(struct server *server, knot_xdp_socket_t *xdp_sock) +{ + xdp_handle_ctx_t *ctx = calloc(1, sizeof(*ctx)); + if (ctx == NULL) { + return NULL; + } + ctx->sock = xdp_sock; + + xdp_handle_reconfigure(ctx); + + if (ctx->tcp) { + // NOTE: the table size don't have to equal its max usage! + ctx->tcp_table = knot_tcp_table_new(ctx->tcp_max_conns, NULL); + if (ctx->tcp_table == NULL) { + xdp_handle_free(ctx); + return NULL; + } + ctx->syn_table = knot_tcp_table_new(ctx->tcp_syn_conns, ctx->tcp_table); + if (ctx->syn_table == NULL) { + xdp_handle_free(ctx); + return NULL; + } + } + + if (ctx->quic_port > 0) { +#ifdef ENABLE_QUIC + conf_t *pconf = conf(); + size_t udp_pl = MIN(pconf->cache.srv_udp_max_payload_ipv4, pconf->cache.srv_udp_max_payload_ipv6); + ctx->quic_table = knot_xquic_table_new(ctx->quic_max_conns, ctx->quic_max_inbufs, + ctx->quic_max_obufs, udp_pl, server->quic_creds); + if (ctx->quic_table == NULL) { + xdp_handle_free(ctx); + return NULL; + } + if (conf_get_bool(pconf, C_XDP, C_QUIC_LOG)) { + ctx->quic_table->log_cb = quic_log_cb; + } +#else + assert(0); // verified in configuration checks +#endif // ENABLE_QUIC + } + + return ctx; +} + +int xdp_handle_recv(xdp_handle_ctx_t *ctx) +{ + int ret = knot_xdp_recv(ctx->sock, ctx->msg_recv, XDP_BATCHLEN, + &ctx->msg_recv_count, NULL); + return ret == KNOT_EOK ? ctx->msg_recv_count : ret; +} + +static void handle_init(knotd_qdata_params_t *params, knot_layer_t *layer, + knotd_query_proto_t proto, const knot_xdp_msg_t *msg, + const struct iovec *payload, struct sockaddr_storage *proxied_remote) +{ + params->proto = proto; + params->remote = (struct sockaddr_storage *)&msg->ip_from; + params->xdp_msg = msg; + + knot_layer_begin(layer, params); + + knot_pkt_t *query = knot_pkt_new(payload->iov_base, payload->iov_len, layer->mm); + int ret = knot_pkt_parse(query, 0); + if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT) + if (params->proto == KNOTD_QUERY_PROTO_UDP && + proxyv2_header_strip(&query, params->remote, proxied_remote) == KNOT_EOK) { + assert(proxied_remote); + params->remote = proxied_remote; + } else { + query->parsed--; // artificially decreasing "parsed" leads to FORMERR + } + } + knot_layer_consume(layer, query); +} + +static void handle_finish(knot_layer_t *layer) +{ + knot_layer_finish(layer); + + // Flush per-query memory (including query and answer packets). + mp_flush(layer->mm->ctx); +} + +static void handle_udp(xdp_handle_ctx_t *ctx, knot_layer_t *layer, + knotd_qdata_params_t *params) +{ + struct sockaddr_storage proxied_remote; + + ctx->msg_udp_count = 0; + + for (uint32_t i = 0; i < ctx->msg_recv_count; i++) { + knot_xdp_msg_t *msg_recv = &ctx->msg_recv[i]; + knot_xdp_msg_t *msg_send = &ctx->msg_send_udp[ctx->msg_udp_count]; + + // Skip TCP or QUIC or marked (zero length) message. + if ((msg_recv->flags & KNOT_XDP_MSG_TCP) || + msg_recv->ip_to.sin6_port == ctx->quic_port || + msg_recv->payload.iov_len == 0) { + continue; + } + + // Try to allocate a buffer for a reply. + if (knot_xdp_reply_alloc(ctx->sock, msg_recv, msg_send) != KNOT_EOK) { + log_notice("UDP, failed to send some packets"); + break; // Drop the rest of the messages. + } + ctx->msg_udp_count++; + + // Consume the query. + handle_init(params, layer, KNOTD_QUERY_PROTO_UDP, msg_recv, &msg_recv->payload, + &proxied_remote); + + // Process the reply. + knot_pkt_t *ans = knot_pkt_new(msg_send->payload.iov_base, + msg_send->payload.iov_len, layer->mm); + while (udp_state_active(layer->state)) { + knot_layer_produce(layer, ans); + } + if (layer->state == KNOT_STATE_DONE) { + msg_send->payload.iov_len = ans->size; + } else { + // If not success, don't send any reply. + msg_send->payload.iov_len = 0; + } + + // Reset the processing. + handle_finish(layer); + } +} + +static void handle_tcp(xdp_handle_ctx_t *ctx, knot_layer_t *layer, + knotd_qdata_params_t *params) +{ + int ret = knot_tcp_recv(ctx->relays, ctx->msg_recv, ctx->msg_recv_count, + ctx->tcp_table, ctx->syn_table, XDP_TCP_IGNORE_NONE); + if (ret != KNOT_EOK) { + log_notice("TCP, failed to process some packets (%s)", knot_strerror(ret)); + return; + } else if (knot_tcp_relay_empty(&ctx->relays[0])) { // no TCP traffic + return; + } + + uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE]; + + for (uint32_t i = 0; i < ctx->msg_recv_count; i++) { + knot_tcp_relay_t *rl = &ctx->relays[i]; + + // Process all complete DNS queries in one TCP stream. + for (size_t j = 0; j < rl->inbufs_count; j++) { + // Consume the query. + handle_init(params, layer, KNOTD_QUERY_PROTO_TCP, rl->msg, &rl->inbufs[j], NULL); + params->measured_rtt = rl->conn->establish_rtt; + + // Process the reply. + knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm); + while (tcp_active_state(layer->state)) { + knot_layer_produce(layer, ans); + if (!tcp_send_state(layer->state)) { + continue; + } + + (void)knot_tcp_reply_data(rl, ctx->tcp_table, false, + ans->wire, ans->size); + } + + handle_finish(layer); + } + } +} + +#ifdef ENABLE_QUIC +static void handle_quic_stream(knot_xquic_conn_t *conn, int64_t stream_id, struct iovec *inbuf, + knot_layer_t *layer, knotd_qdata_params_t *params, uint8_t *ans_buf, + size_t ans_buf_size, const knot_xdp_msg_t *xdp_msg) +{ + // Consume the query. + handle_init(params, layer, KNOTD_QUERY_PROTO_QUIC, xdp_msg, inbuf, NULL); + params->measured_rtt = knot_xquic_conn_rtt(conn); + + // Process the reply. + knot_pkt_t *ans = knot_pkt_new(ans_buf, ans_buf_size, layer->mm); + while (tcp_active_state(layer->state)) { + knot_layer_produce(layer, ans); + if (!tcp_send_state(layer->state)) { + continue; + } + if (knot_xquic_stream_add_data(conn, stream_id, ans->wire, ans->size) == NULL) { + break; + } + } + + handle_finish(layer); +} +#endif // ENABLE_QUIC + +static void handle_quic(xdp_handle_ctx_t *ctx, knot_layer_t *layer, + knotd_qdata_params_t *params) +{ +#ifdef ENABLE_QUIC + if (ctx->quic_table == NULL) { + return; + } + + uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE]; + + for (uint32_t i = 0; i < ctx->msg_recv_count; i++) { + knot_xdp_msg_t *msg_recv = &ctx->msg_recv[i]; + ctx->quic_relays[i] = NULL; + + if ((msg_recv->flags & KNOT_XDP_MSG_TCP) || + msg_recv->ip_to.sin6_port != ctx->quic_port || + msg_recv->payload.iov_len == 0) { + continue; + } + + ctx->quic_rets[i] = knot_xquic_handle(ctx->quic_table, msg_recv, + ctx->quic_idle_close, + &ctx->quic_relays[i]); + knot_xquic_conn_t *rl = ctx->quic_relays[i]; + + int64_t stream_id; + knot_xquic_stream_t *stream; + + while (rl != NULL && (stream = knot_xquic_stream_get_process(rl, &stream_id)) != NULL) { + assert(stream->inbuf_fin != NULL); + assert(stream->inbuf_fin->iov_len > 0); + handle_quic_stream(rl, stream_id, stream->inbuf_fin, layer, params, + ans_buf, sizeof(ans_buf), &ctx->msg_recv[i]); + free(stream->inbuf_fin); + stream->inbuf_fin = NULL; + } + } +#else + (void)(ctx); + (void)(layer); + (void)(params); +#endif // ENABLE_QUIC +} + +void xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_layer_t *layer, + server_t *server, unsigned thread_id) +{ + assert(ctx->msg_recv_count > 0); + + knotd_qdata_params_t params = { + .socket = knot_xdp_socket_fd(ctx->sock), + .server = server, + .thread_id = thread_id, + }; + + knot_xdp_send_prepare(ctx->sock); + + handle_udp(ctx, layer, ¶ms); + if (ctx->tcp) { + handle_tcp(ctx, layer, ¶ms); + } + handle_quic(ctx, layer, ¶ms); + + knot_xdp_recv_finish(ctx->sock, ctx->msg_recv, ctx->msg_recv_count); +} + +void xdp_handle_send(xdp_handle_ctx_t *ctx) +{ + uint32_t unused; + int ret = knot_xdp_send(ctx->sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused); + if (ret != KNOT_EOK) { + log_notice("UDP, failed to send some packets"); + } + if (ctx->tcp) { + ret = knot_tcp_send(ctx->sock, ctx->relays, ctx->msg_recv_count, + XDP_BATCHLEN); + if (ret != KNOT_EOK) { + log_notice("TCP, failed to send some packets"); + } + } +#ifdef ENABLE_QUIC + for (uint32_t i = 0; i < ctx->msg_recv_count; i++) { + if (ctx->quic_relays[i] == NULL) { + continue; + } + + ret = knot_xquic_send(ctx->quic_table, ctx->quic_relays[i], ctx->sock, + &ctx->msg_recv[i], ctx->quic_rets[i], + QUIC_MAX_SEND_PER_RECV, false); + if (ret != KNOT_EOK) { + log_notice("QUIC, failed to send some packets"); + } + } + knot_xquic_cleanup(ctx->quic_relays, ctx->msg_recv_count); +#endif // ENABLE_QUIC + + (void)knot_xdp_send_finish(ctx->sock); + + if (ctx->tcp) { + knot_tcp_cleanup(ctx->tcp_table, ctx->relays, ctx->msg_recv_count); + } +} + +void xdp_handle_sweep(xdp_handle_ctx_t *ctx) +{ +#ifdef ENABLE_QUIC + if (ctx->quic_table != NULL) { + knot_xquic_table_sweep(ctx->quic_table, &ctx->quic_closed.stats); + log_closed(&ctx->quic_closed, false); + } +#endif // ENABLE_QUIC + + if (!ctx->tcp) { + return; + } + + int ret = KNOT_EOK; + uint32_t prev_total; + knot_tcp_relay_t sweep_relays[XDP_BATCHLEN]; + do { + knot_xdp_send_prepare(ctx->sock); + + prev_total = ctx->tcp_closed.stats.total; + + ret = knot_tcp_sweep(ctx->tcp_table, ctx->tcp_idle_close, ctx->tcp_idle_reset, + ctx->tcp_idle_resend, + ctx->tcp_max_conns, ctx->tcp_max_inbufs, ctx->tcp_max_obufs, + sweep_relays, XDP_BATCHLEN, &ctx->tcp_closed.stats); + if (ret == KNOT_EOK) { + ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN); + } + knot_tcp_cleanup(ctx->tcp_table, sweep_relays, XDP_BATCHLEN); + if (ret != KNOT_EOK) { + break; + } + + ret = knot_tcp_sweep(ctx->syn_table, UINT32_MAX, ctx->tcp_idle_reset, + UINT32_MAX, ctx->tcp_syn_conns, SIZE_MAX, SIZE_MAX, + sweep_relays, XDP_BATCHLEN, &ctx->tcp_closed.stats); + if (ret == KNOT_EOK) { + ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN); + } + knot_tcp_cleanup(ctx->syn_table, sweep_relays, XDP_BATCHLEN); + + (void)knot_xdp_send_finish(ctx->sock); + } while (ret == KNOT_EOK && prev_total < ctx->tcp_closed.stats.total); + + log_closed(&ctx->tcp_closed, true); +} + +#endif // ENABLE_XDP diff --git a/src/knot/server/xdp-handler.h b/src/knot/server/xdp-handler.h new file mode 100644 index 0000000..e6374ca --- /dev/null +++ b/src/knot/server/xdp-handler.h @@ -0,0 +1,67 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef ENABLE_XDP + +#include "knot/query/layer.h" +#include "libknot/xdp/xdp.h" + +#define XDP_BATCHLEN 32 /*!< XDP receive batch size. */ + +struct xdp_handle_ctx; +struct server; + +/*! + * \brief Initialize XDP packet handling context. + */ +struct xdp_handle_ctx *xdp_handle_init(struct server *server, knot_xdp_socket_t *sock); + +/*! + * \brief Deinitialize XDP packet handling context. + */ +void xdp_handle_free(struct xdp_handle_ctx *ctx); + +/*! + * \brief Receive packets thru XDP socket. + */ +int xdp_handle_recv(struct xdp_handle_ctx *ctx); + +/*! + * \brief Answer packets including DNS layers. + * + * \warning In case of TCP, this also sends some packets, e.g. ACK. + */ +void xdp_handle_msgs(struct xdp_handle_ctx *ctx, knot_layer_t *layer, + struct server *server, unsigned thread_id); + +/*! + * \brief Send packets thru XDP socket. + */ +void xdp_handle_send(struct xdp_handle_ctx *ctx); + +/*! + * \brief Check for old TCP connections and close/reset them. + */ +void xdp_handle_sweep(struct xdp_handle_ctx *ctx); + +/*! + * \brief Update configuration parameters of running ctx. + */ +void xdp_handle_reconfigure(struct xdp_handle_ctx *ctx); + +#endif // ENABLE_XDP |