summaryrefslogtreecommitdiffstats
path: root/src/knot/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/knot/server')
-rw-r--r--src/knot/server/dthreads.c767
-rw-r--r--src/knot/server/dthreads.h295
-rw-r--r--src/knot/server/proxyv2.c69
-rw-r--r--src/knot/server/proxyv2.h23
-rw-r--r--src/knot/server/server.c1335
-rw-r--r--src/knot/server/server.h203
-rw-r--r--src/knot/server/tcp-handler.c380
-rw-r--r--src/knot/server/tcp-handler.h43
-rw-r--r--src/knot/server/udp-handler.c575
-rw-r--r--src/knot/server/udp-handler.h43
-rw-r--r--src/knot/server/xdp-handler.c506
-rw-r--r--src/knot/server/xdp-handler.h67
12 files changed, 4306 insertions, 0 deletions
diff --git a/src/knot/server/dthreads.c b/src/knot/server/dthreads.c
new file mode 100644
index 0000000..74203ac
--- /dev/null
+++ b/src/knot/server/dthreads.c
@@ -0,0 +1,767 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <urcu.h>
+
+#ifdef HAVE_PTHREAD_NP_H
+#include <pthread_np.h>
+#endif /* HAVE_PTHREAD_NP_H */
+
+#include "knot/server/dthreads.h"
+#include "libknot/libknot.h"
+
+/* BSD cpu set compatibility. */
+#if defined(HAVE_CPUSET_BSD)
+typedef cpuset_t cpu_set_t;
+#endif
+
+/*! \brief Lock thread state for R/W. */
+static inline void lock_thread_rw(dthread_t *thread)
+{
+ pthread_mutex_lock(&thread->_mx);
+}
+/*! \brief Unlock thread state for R/W. */
+static inline void unlock_thread_rw(dthread_t *thread)
+{
+ pthread_mutex_unlock(&thread->_mx);
+}
+
+/*! \brief Signalize thread state change. */
+static inline void unit_signalize_change(dt_unit_t *unit)
+{
+ pthread_mutex_lock(&unit->_report_mx);
+ pthread_cond_signal(&unit->_report);
+ pthread_mutex_unlock(&unit->_report_mx);
+}
+
+/*!
+ * \brief Update thread state with notification.
+ * \param thread Given thread.
+ * \param state New state for thread.
+ * \retval 0 on success.
+ * \retval <0 on error (EINVAL, ENOTSUP).
+ */
+static inline int dt_update_thread(dthread_t *thread, int state)
+{
+ if (thread == 0) {
+ return KNOT_EINVAL;
+ }
+
+ // Cancel with lone thread
+ dt_unit_t *unit = thread->unit;
+ if (unit == 0) {
+ return KNOT_ENOTSUP;
+ }
+
+ // Cancel current runnable if running
+ pthread_mutex_lock(&unit->_notify_mx);
+ lock_thread_rw(thread);
+ if (thread->state & (ThreadIdle | ThreadActive)) {
+
+ // Update state
+ thread->state = state;
+ unlock_thread_rw(thread);
+
+ // Notify thread
+ pthread_cond_broadcast(&unit->_notify);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ } else {
+ /* Unable to update thread, it is already dead. */
+ unlock_thread_rw(thread);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ return KNOT_EINVAL;
+ }
+
+ return KNOT_EOK;
+}
+
+/*!
+ * \brief Thread entrypoint function.
+ *
+ * When a thread is created and started, it immediately enters this function.
+ * Depending on thread state, it either enters runnable or
+ * blocks until it is awakened.
+ *
+ * This function also handles "ThreadIdle" state to quickly suspend and resume
+ * threads and mitigate thread creation costs. Also, thread runnable may
+ * be changed to alter the thread behavior on runtime
+ */
+static void *thread_ep(void *data)
+{
+ dthread_t *thread = (dthread_t *)data;
+ if (thread == 0) {
+ return 0;
+ }
+
+ // Check if is a member of unit
+ dt_unit_t *unit = thread->unit;
+ if (unit == 0) {
+ return 0;
+ }
+
+ // Unblock SIGALRM for synchronization
+ sigset_t mask;
+ (void)sigemptyset(&mask);
+ sigaddset(&mask, SIGALRM);
+ pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
+
+ rcu_register_thread();
+
+ // Run loop
+ for (;;) {
+
+ // Check thread state
+ lock_thread_rw(thread);
+ if (thread->state == ThreadDead) {
+ unlock_thread_rw(thread);
+ break;
+ }
+
+ // Update data
+ thread->data = thread->_adata;
+ runnable_t _run = thread->run;
+
+ // Start runnable if thread is marked Active
+ if ((thread->state == ThreadActive) && (thread->run != 0)) {
+ unlock_thread_rw(thread);
+ _run(thread);
+ } else {
+ unlock_thread_rw(thread);
+ }
+
+ // If the runnable was cancelled, start new iteration
+ lock_thread_rw(thread);
+ if (thread->state & ThreadCancelled) {
+ thread->state &= ~ThreadCancelled;
+ unlock_thread_rw(thread);
+ continue;
+ }
+ unlock_thread_rw(thread);
+
+ // Runnable finished without interruption, mark as Idle
+ pthread_mutex_lock(&unit->_notify_mx);
+ lock_thread_rw(thread);
+ if (thread->state & ThreadActive) {
+ thread->state &= ~ThreadActive;
+ thread->state |= ThreadIdle;
+ }
+
+ // Go to sleep if idle
+ if (thread->state & ThreadIdle) {
+ unlock_thread_rw(thread);
+
+ // Signalize state change
+ unit_signalize_change(unit);
+
+ // Wait for notification from unit
+ pthread_cond_wait(&unit->_notify, &unit->_notify_mx);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ } else {
+ unlock_thread_rw(thread);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ }
+ }
+
+ // Thread destructor
+ if (thread->destruct) {
+ thread->destruct(thread);
+ }
+
+ // Report thread state change
+ unit_signalize_change(unit);
+ lock_thread_rw(thread);
+ thread->state |= ThreadJoinable;
+ unlock_thread_rw(thread);
+ rcu_unregister_thread();
+
+ // Return
+ return 0;
+}
+
+/*!
+ * \brief Create single thread.
+ * \retval New thread instance on success.
+ * \retval NULL on error.
+ */
+static dthread_t *dt_create_thread(dt_unit_t *unit)
+{
+ // Alloc thread
+ dthread_t *thread = malloc(sizeof(dthread_t));
+ if (thread == 0) {
+ return 0;
+ }
+
+ memset(thread, 0, sizeof(dthread_t));
+
+ // Blank thread state
+ thread->state = ThreadJoined;
+ pthread_mutex_init(&thread->_mx, 0);
+
+ // Set membership in unit
+ thread->unit = unit;
+
+ // Initialize attribute
+ pthread_attr_t *attr = &thread->_attr;
+ pthread_attr_init(attr);
+ //pthread_attr_setinheritsched(attr, PTHREAD_INHERIT_SCHED);
+ //pthread_attr_setschedpolicy(attr, SCHED_OTHER);
+ pthread_attr_setstacksize(attr, 1024*1024);
+ return thread;
+}
+
+/*! \brief Delete single thread. */
+static void dt_delete_thread(dthread_t **thread)
+{
+ if (!thread || !*thread) {
+ return;
+ }
+
+ dthread_t* thr = *thread;
+ thr->unit = 0;
+ *thread = 0;
+
+ // Delete attribute
+ pthread_attr_destroy(&(thr)->_attr);
+
+ // Delete mutex
+ pthread_mutex_destroy(&(thr)->_mx);
+
+ // Free memory
+ free(thr);
+}
+
+static dt_unit_t *dt_create_unit(int count)
+{
+ if (count <= 0) {
+ return 0;
+ }
+
+ dt_unit_t *unit = malloc(sizeof(dt_unit_t));
+ if (unit == 0) {
+ return 0;
+ }
+
+ // Initialize conditions
+ if (pthread_cond_init(&unit->_notify, 0) != 0) {
+ free(unit);
+ return 0;
+ }
+ if (pthread_cond_init(&unit->_report, 0) != 0) {
+ pthread_cond_destroy(&unit->_notify);
+ free(unit);
+ return 0;
+ }
+
+ // Initialize mutexes
+ if (pthread_mutex_init(&unit->_notify_mx, 0) != 0) {
+ pthread_cond_destroy(&unit->_notify);
+ pthread_cond_destroy(&unit->_report);
+ free(unit);
+ return 0;
+ }
+ if (pthread_mutex_init(&unit->_report_mx, 0) != 0) {
+ pthread_cond_destroy(&unit->_notify);
+ pthread_cond_destroy(&unit->_report);
+ pthread_mutex_destroy(&unit->_notify_mx);
+ free(unit);
+ return 0;
+ }
+ if (pthread_mutex_init(&unit->_mx, 0) != 0) {
+ pthread_cond_destroy(&unit->_notify);
+ pthread_cond_destroy(&unit->_report);
+ pthread_mutex_destroy(&unit->_notify_mx);
+ pthread_mutex_destroy(&unit->_report_mx);
+ free(unit);
+ return 0;
+ }
+
+ // Save unit size
+ unit->size = count;
+
+ // Alloc threads
+ unit->threads = calloc(count, sizeof(dthread_t *));
+ if (unit->threads == 0) {
+ pthread_cond_destroy(&unit->_notify);
+ pthread_cond_destroy(&unit->_report);
+ pthread_mutex_destroy(&unit->_notify_mx);
+ pthread_mutex_destroy(&unit->_report_mx);
+ pthread_mutex_destroy(&unit->_mx);
+ free(unit);
+ return 0;
+ }
+
+ // Initialize threads
+ int init_success = 1;
+ for (int i = 0; i < count; ++i) {
+ unit->threads[i] = dt_create_thread(unit);
+ if (unit->threads[i] == 0) {
+ init_success = 0;
+ break;
+ }
+ }
+
+ // Check thread initialization
+ if (!init_success) {
+
+ // Delete created threads
+ for (int i = 0; i < count; ++i) {
+ dt_delete_thread(&unit->threads[i]);
+ }
+
+ // Free rest of the unit
+ pthread_cond_destroy(&unit->_notify);
+ pthread_cond_destroy(&unit->_report);
+ pthread_mutex_destroy(&unit->_notify_mx);
+ pthread_mutex_destroy(&unit->_report_mx);
+ pthread_mutex_destroy(&unit->_mx);
+ free(unit->threads);
+ free(unit);
+ return 0;
+ }
+
+ return unit;
+}
+
+dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data)
+{
+ if (count <= 0) {
+ return 0;
+ }
+
+ // Create unit
+ dt_unit_t *unit = dt_create_unit(count);
+ if (unit == 0) {
+ return 0;
+ }
+
+ // Set threads common purpose
+ pthread_mutex_lock(&unit->_notify_mx);
+ dt_unit_lock(unit);
+
+ for (int i = 0; i < count; ++i) {
+ dthread_t *thread = unit->threads[i];
+ lock_thread_rw(thread);
+ thread->run = runnable;
+ thread->destruct = destructor;
+ thread->_adata = data;
+ unlock_thread_rw(thread);
+ }
+
+ dt_unit_unlock(unit);
+ pthread_mutex_unlock(&unit->_notify_mx);
+
+ return unit;
+}
+
+void dt_delete(dt_unit_t **unit)
+{
+ /*
+ * All threads must be stopped or idle at this point,
+ * or else the behavior is undefined.
+ * Sorry.
+ */
+
+ if (unit == 0) {
+ return;
+ }
+ if (*unit == 0) {
+ return;
+ }
+
+ // Compact and reclaim idle threads
+ dt_unit_t *d_unit = *unit;
+ dt_compact(d_unit);
+
+ // Delete threads
+ for (int i = 0; i < d_unit->size; ++i) {
+ dt_delete_thread(&d_unit->threads[i]);
+ }
+
+ // Deinit mutexes
+ pthread_mutex_destroy(&d_unit->_notify_mx);
+ pthread_mutex_destroy(&d_unit->_report_mx);
+ pthread_mutex_destroy(&d_unit->_mx);
+
+ // Deinit conditions
+ pthread_cond_destroy(&d_unit->_notify);
+ pthread_cond_destroy(&d_unit->_report);
+
+ // Free memory
+ free(d_unit->threads);
+ free(d_unit);
+ *unit = 0;
+}
+
+static int dt_start_id(dthread_t *thread)
+{
+ if (thread == 0) {
+ return KNOT_EINVAL;
+ }
+
+ lock_thread_rw(thread);
+
+ // Update state
+ int prev_state = thread->state;
+ thread->state |= ThreadActive;
+ thread->state &= ~ThreadIdle;
+ thread->state &= ~ThreadDead;
+ thread->state &= ~ThreadJoined;
+ thread->state &= ~ThreadJoinable;
+
+ // Do not re-create running threads
+ if (prev_state != ThreadJoined) {
+ unlock_thread_rw(thread);
+ return 0;
+ }
+
+ // Start thread
+ sigset_t mask_all, mask_old;
+ sigfillset(&mask_all);
+ sigdelset(&mask_all, SIGPROF);
+ pthread_sigmask(SIG_SETMASK, &mask_all, &mask_old);
+ int res = pthread_create(&thread->_thr, /* pthread_t */
+ &thread->_attr, /* pthread_attr_t */
+ thread_ep, /* routine: thread_ep */
+ thread); /* passed object: dthread_t */
+ pthread_sigmask(SIG_SETMASK, &mask_old, NULL);
+
+ // Unlock thread
+ unlock_thread_rw(thread);
+ return res;
+}
+
+int dt_start(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ // Lock unit
+ pthread_mutex_lock(&unit->_notify_mx);
+ dt_unit_lock(unit);
+ for (int i = 0; i < unit->size; ++i) {
+
+ dthread_t *thread = unit->threads[i];
+ int res = dt_start_id(thread);
+ if (res != 0) {
+ dt_unit_unlock(unit);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ return res;
+ }
+ }
+
+ // Unlock unit
+ dt_unit_unlock(unit);
+ pthread_cond_broadcast(&unit->_notify);
+ pthread_mutex_unlock(&unit->_notify_mx);
+ return KNOT_EOK;
+}
+
+int dt_signalize(dthread_t *thread, int signum)
+{
+ if (thread == 0) {
+ return KNOT_EINVAL;
+ }
+
+ int ret = pthread_kill(thread->_thr, signum);
+
+ /* Not thread id found or invalid signum. */
+ if (ret == EINVAL || ret == ESRCH) {
+ return KNOT_EINVAL;
+ }
+
+ /* Generic error. */
+ if (ret < 0) {
+ return KNOT_ERROR;
+ }
+
+ return KNOT_EOK;
+}
+
+int dt_join(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ for (;;) {
+
+ // Lock unit
+ pthread_mutex_lock(&unit->_report_mx);
+ dt_unit_lock(unit);
+
+ // Browse threads
+ int active_threads = 0;
+ for (int i = 0; i < unit->size; ++i) {
+
+ // Count active or cancelled but pending threads
+ dthread_t *thread = unit->threads[i];
+ lock_thread_rw(thread);
+ if (thread->state & (ThreadActive|ThreadCancelled)) {
+ ++active_threads;
+ }
+
+ // Reclaim dead threads, but only fast
+ if (thread->state & ThreadJoinable) {
+ unlock_thread_rw(thread);
+ pthread_join(thread->_thr, 0);
+ lock_thread_rw(thread);
+ thread->state = ThreadJoined;
+ unlock_thread_rw(thread);
+ } else {
+ unlock_thread_rw(thread);
+ }
+ }
+
+ // Unlock unit
+ dt_unit_unlock(unit);
+
+ // Check result
+ if (active_threads == 0) {
+ pthread_mutex_unlock(&unit->_report_mx);
+ break;
+ }
+
+ // Wait for a thread to finish
+ pthread_cond_wait(&unit->_report, &unit->_report_mx);
+ pthread_mutex_unlock(&unit->_report_mx);
+ }
+
+ return KNOT_EOK;
+}
+
+int dt_stop(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ // Lock unit
+ pthread_mutex_lock(&unit->_notify_mx);
+ dt_unit_lock(unit);
+
+ // Signalize all threads to stop
+ for (int i = 0; i < unit->size; ++i) {
+
+ // Lock thread
+ dthread_t *thread = unit->threads[i];
+ lock_thread_rw(thread);
+ if (thread->state & (ThreadIdle | ThreadActive)) {
+ thread->state = ThreadDead | ThreadCancelled;
+ dt_signalize(thread, SIGALRM);
+ }
+ unlock_thread_rw(thread);
+ }
+
+ // Unlock unit
+ dt_unit_unlock(unit);
+
+ // Broadcast notification
+ pthread_cond_broadcast(&unit->_notify);
+ pthread_mutex_unlock(&unit->_notify_mx);
+
+ return KNOT_EOK;
+}
+
+int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count)
+{
+ if (thread == NULL) {
+ return KNOT_EINVAL;
+ }
+
+#ifdef HAVE_PTHREAD_SETAFFINITY_NP
+ int ret = -1;
+
+/* Linux, FreeBSD interface. */
+#if defined(HAVE_CPUSET_LINUX) || defined(HAVE_CPUSET_BSD)
+ cpu_set_t set;
+ CPU_ZERO(&set);
+ for (unsigned i = 0; i < cpu_count; ++i) {
+ CPU_SET(cpu_id[i], &set);
+ }
+ ret = pthread_setaffinity_np(thread->_thr, sizeof(cpu_set_t), &set);
+/* NetBSD interface. */
+#elif defined(HAVE_CPUSET_NETBSD)
+ cpuset_t *set = cpuset_create();
+ if (set == NULL) {
+ return KNOT_ENOMEM;
+ }
+ cpuset_zero(set);
+ for (unsigned i = 0; i < cpu_count; ++i) {
+ cpuset_set(cpu_id[i], set);
+ }
+ ret = pthread_setaffinity_np(thread->_thr, cpuset_size(set), set);
+ cpuset_destroy(set);
+#endif /* interface */
+
+ if (ret < 0) {
+ return KNOT_ERROR;
+ }
+
+#else /* HAVE_PTHREAD_SETAFFINITY_NP */
+ return KNOT_ENOTSUP;
+#endif
+
+ return KNOT_EOK;
+}
+
+int dt_activate(dthread_t *thread)
+{
+ return dt_update_thread(thread, ThreadActive);
+}
+
+int dt_cancel(dthread_t *thread)
+{
+ return dt_update_thread(thread, ThreadIdle | ThreadCancelled);
+}
+
+int dt_compact(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ // Lock unit
+ pthread_mutex_lock(&unit->_notify_mx);
+ dt_unit_lock(unit);
+
+ // Reclaim all Idle threads
+ for (int i = 0; i < unit->size; ++i) {
+
+ // Locked state update
+ dthread_t *thread = unit->threads[i];
+ lock_thread_rw(thread);
+ if (thread->state & (ThreadIdle)) {
+ thread->state = ThreadDead | ThreadCancelled;
+ dt_signalize(thread, SIGALRM);
+ }
+ unlock_thread_rw(thread);
+ }
+
+ // Notify all threads
+ pthread_cond_broadcast(&unit->_notify);
+ pthread_mutex_unlock(&unit->_notify_mx);
+
+ // Join all threads
+ for (int i = 0; i < unit->size; ++i) {
+
+ // Reclaim all dead threads
+ dthread_t *thread = unit->threads[i];
+ lock_thread_rw(thread);
+ if (thread->state & (ThreadDead)) {
+ unlock_thread_rw(thread);
+ pthread_join(thread->_thr, 0);
+ lock_thread_rw(thread);
+ thread->state = ThreadJoined;
+ unlock_thread_rw(thread);
+ } else {
+ unlock_thread_rw(thread);
+ }
+ }
+
+ // Unlock unit
+ dt_unit_unlock(unit);
+
+ return KNOT_EOK;
+}
+
+int dt_online_cpus(void)
+{
+ int ret = -1;
+/* Linux, FreeBSD, NetBSD, OpenBSD, macOS/OS X 10.4+, Solaris */
+#ifdef _SC_NPROCESSORS_ONLN
+ ret = (int) sysconf(_SC_NPROCESSORS_ONLN);
+#else
+/* OS X < 10.4 and some other OS's (if not handled by sysconf() above) */
+/* hw.ncpu won't work on FreeBSD, OpenBSD, NetBSD, DragonFlyBSD, and recent macOS/OS X. */
+#if HAVE_SYSCTLBYNAME
+ size_t rlen = sizeof(int);
+ if (sysctlbyname("hw.ncpu", &ret, &rlen, NULL, 0) < 0) {
+ ret = -1;
+ }
+#endif
+#endif
+ return ret;
+}
+
+int dt_optimal_size(void)
+{
+ int ret = dt_online_cpus();
+ if (ret > 1) {
+ return ret;
+ }
+
+ return DEFAULT_THR_COUNT;
+}
+
+int dt_is_cancelled(dthread_t *thread)
+{
+ if (thread == 0) {
+ return 0;
+ }
+
+ return thread->state & ThreadCancelled; /* No need to be locked. */
+}
+
+unsigned dt_get_id(dthread_t *thread)
+{
+ if (thread == NULL || thread->unit == NULL) {
+ return 0;
+ }
+
+ dt_unit_t *unit = thread->unit;
+ for(int tid = 0; tid < unit->size; ++tid) {
+ if (thread == unit->threads[tid]) {
+ return tid;
+ }
+ }
+
+ return 0;
+}
+
+int dt_unit_lock(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ int ret = pthread_mutex_lock(&unit->_mx);
+ if (ret < 0) {
+ return knot_map_errno();
+ }
+
+ return KNOT_EOK;
+}
+
+int dt_unit_unlock(dt_unit_t *unit)
+{
+ if (unit == 0) {
+ return KNOT_EINVAL;
+ }
+
+ int ret = pthread_mutex_unlock(&unit->_mx);
+ if (ret < 0) {
+ return knot_map_errno();
+ }
+
+ return KNOT_EOK;
+}
diff --git a/src/knot/server/dthreads.h b/src/knot/server/dthreads.h
new file mode 100644
index 0000000..0c243a1
--- /dev/null
+++ b/src/knot/server/dthreads.h
@@ -0,0 +1,295 @@
+/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/*!
+ * \brief Threading API.
+ *
+ * Dynamic threads provide:
+ * - coherent and incoherent threading capabilities
+ * - thread repurposing
+ * - thread prioritization
+ * - on-the-fly changing of threading unit size
+ *
+ * Coherent threading unit is when all threads execute
+ * the same runnable function.
+ *
+ * Incoherent function is when at least one thread executes
+ * a different runnable than the others.
+ */
+
+#pragma once
+
+#include <pthread.h>
+
+#define DEFAULT_THR_COUNT 2 /*!< Default thread count. */
+
+/* Forward decls */
+struct dthread;
+struct dt_unit;
+
+/*!
+ * \brief Thread state enumeration.
+ */
+typedef enum {
+ ThreadJoined = 1 << 0, /*!< Thread is finished and joined. */
+ ThreadJoinable = 1 << 1, /*!< Thread is waiting to be reclaimed. */
+ ThreadCancelled = 1 << 2, /*!< Thread is cancelled, finishing task. */
+ ThreadDead = 1 << 3, /*!< Thread is finished, exiting. */
+ ThreadIdle = 1 << 4, /*!< Thread is idle, waiting for purpose. */
+ ThreadActive = 1 << 5 /*!< Thread is active, working on a task. */
+} dt_state_t;
+
+/*!
+ * \brief Thread runnable prototype.
+ *
+ * Runnable is basically a pointer to function which is called on active
+ * thread runtime.
+ *
+ * \note When implementing a runnable, keep in mind to check thread state as
+ * it may change, and implement a cooperative cancellation point.
+ *
+ * Implement this by checking dt_is_cancelled() and return
+ * as soon as possible.
+ */
+typedef int (*runnable_t)(struct dthread *);
+
+/*!
+ * \brief Single thread descriptor public API.
+ */
+typedef struct dthread {
+ volatile unsigned state; /*!< Bitfield of dt_flag flags. */
+ runnable_t run; /*!< Runnable function or 0. */
+ runnable_t destruct; /*!< Destructor function or 0. */
+ void *data; /*!< Currently active data */
+ struct dt_unit *unit; /*!< Reference to assigned unit. */
+ void *_adata; /*!< Thread-specific data. */
+ pthread_t _thr; /*!< Thread */
+ pthread_attr_t _attr; /*!< Thread attributes */
+ pthread_mutex_t _mx; /*!< Thread state change lock. */
+} dthread_t;
+
+/*!
+ * \brief Thread unit descriptor API.
+ *
+ * Thread unit consists of 1..N threads.
+ * Unit is coherent if all threads execute
+ * the same runnable.
+ */
+typedef struct dt_unit {
+ int size; /*!< Unit width (number of threads) */
+ struct dthread **threads; /*!< Array of threads */
+ pthread_cond_t _notify; /*!< Notify thread */
+ pthread_mutex_t _notify_mx; /*!< Condition mutex */
+ pthread_cond_t _report; /*!< Report thread state */
+ pthread_mutex_t _report_mx; /*!< Condition mutex */
+ pthread_mutex_t _mx; /*!< Unit lock */
+} dt_unit_t;
+
+/*!
+ * \brief Create a set of coherent threads.
+ *
+ * Coherent means, that the threads will share a common runnable and the data.
+ *
+ * \param count Requested thread count.
+ * \param runnable Runnable function for all threads.
+ * \param destructor Destructor for all threads.
+ * \param data Any data passed onto threads.
+ *
+ * \retval New instance if successful
+ * \retval NULL on error
+ */
+dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data);
+
+/*!
+ * \brief Free unit.
+ *
+ * \warning Behavior is undefined if threads are still active, make sure
+ * to call dt_join() first.
+ *
+ * \param unit Unit to be deleted.
+ */
+void dt_delete(dt_unit_t **unit);
+
+/*!
+ * \brief Start all threads in selected unit.
+ *
+ * \param unit Unit to be started.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters (unit is null).
+ */
+int dt_start(dt_unit_t *unit);
+
+/*!
+ * \brief Send given signal to thread.
+ *
+ * \note This is useful to interrupt some blocking I/O as well, for example
+ * with SIGALRM, which is handled by default.
+ * \note Signal handler may be overridden in runnable.
+ *
+ * \param thread Target thread instance.
+ * \param signum Signal code.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ * \retval KNOT_ERROR unspecified error.
+ */
+int dt_signalize(dthread_t *thread, int signum);
+
+/*!
+ * \brief Wait for all thread in unit to finish.
+ *
+ * \param unit Unit to be joined.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int dt_join(dt_unit_t *unit);
+
+/*!
+ * \brief Stop all threads in unit.
+ *
+ * Thread is interrupted at the nearest runnable cancellation point.
+ *
+ * \param unit Unit to be stopped.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int dt_stop(dt_unit_t *unit);
+
+/*!
+ * \brief Set thread affinity to masked CPU's.
+ *
+ * \param thread Target thread instance.
+ * \param cpu_id Array of CPU IDs to set affinity to.
+ * \param cpu_count Number of CPUs in the array, set to 0 for no CPU.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count);
+
+/*!
+ * \brief Wake up thread from idle state.
+ *
+ * Thread is awoken from idle state and reenters runnable.
+ * This function only affects idle threads.
+ *
+ * \note Unit needs to be started with dt_start() first, as the function
+ * doesn't affect dead threads.
+ *
+ * \param thread Target thread instance.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ * \retval KNOT_ENOTSUP operation not supported.
+ */
+int dt_activate(dthread_t *thread);
+
+/*!
+ * \brief Put thread to idle state, cancels current runnable function.
+ *
+ * Thread is flagged with Cancel flag and returns from runnable at the nearest
+ * cancellation point, which requires complying runnable function.
+ *
+ * \note Thread isn't disposed, but put to idle state until it's requested
+ * again or collected by dt_compact().
+ *
+ * \param thread Target thread instance.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int dt_cancel(dthread_t *thread);
+
+/*!
+ * \brief Collect and dispose idle threads.
+ *
+ * \param unit Target unit instance.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int dt_compact(dt_unit_t *unit);
+
+/*!
+ * \brief Return number of online processors.
+ *
+ * \retval Number of online CPU's if success.
+ * \retval <0 on failure.
+ */
+int dt_online_cpus(void);
+
+/*!
+ * \brief Return optimal number of threads for instance.
+ *
+ * It is estimated as NUM_CPUs + CONSTANT.
+ * Fallback is DEFAULT_THR_COUNT (\see common.h).
+ *
+ * \return Number of threads.
+ */
+int dt_optimal_size(void);
+
+/*!
+ * \brief Return true if thread is cancelled.
+ *
+ * Synchronously check for ThreadCancelled flag.
+ *
+ * \param thread Target thread instance.
+ *
+ * \retval 1 if cancelled.
+ * \retval 0 if not cancelled.
+ */
+int dt_is_cancelled(dthread_t *thread);
+
+/*!
+ * \brief Return thread index in threading unit.
+ *
+ * \note Returns 0 when thread doesn't have a unit.
+ *
+ * \param thread Target thread instance.
+ *
+ * \return Thread index.
+ */
+unsigned dt_get_id(dthread_t *thread);
+
+/*!
+ * \brief Lock unit to prevent parallel operations which could alter unit
+ * at the same time.
+ *
+ * \param unit Target unit instance.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ * \retval KNOT_EAGAIN lack of resources to lock unit, try again.
+ * \retval KNOT_ERROR unspecified error.
+ */
+int dt_unit_lock(dt_unit_t *unit);
+
+/*!
+ * \brief Unlock unit.
+ *
+ * \see dt_unit_lock()
+ *
+ * \param unit Target unit instance.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ * \retval KNOT_EAGAIN lack of resources to unlock unit, try again.
+ * \retval KNOT_ERROR unspecified error.
+ */
+int dt_unit_unlock(dt_unit_t *unit);
diff --git a/src/knot/server/proxyv2.c b/src/knot/server/proxyv2.c
new file mode 100644
index 0000000..ff92263
--- /dev/null
+++ b/src/knot/server/proxyv2.c
@@ -0,0 +1,69 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/server/proxyv2.h"
+
+#include "contrib/proxyv2/proxyv2.h"
+#include "knot/conf/conf.h"
+
+int proxyv2_header_strip(knot_pkt_t **query,
+ const struct sockaddr_storage *remote,
+ struct sockaddr_storage *new_remote)
+{
+ conf_t *pconf = conf();
+ if (!pconf->cache.srv_proxy_enabled) {
+ return KNOT_EDENIED;
+ }
+
+ uint8_t *pkt = (*query)->wire;
+ size_t pkt_len = (*query)->max_size;
+
+ int offset = proxyv2_header_offset(pkt, pkt_len);
+ if (offset <= 0) {
+ return KNOT_EMALF;
+ }
+
+ /*
+ * Check if the query was sent from an IP address authorized to send
+ * proxied DNS traffic.
+ */
+ conf_val_t whitelist_val = conf_get(pconf, C_SRV, C_PROXY_ALLOWLIST);
+ if (!conf_addr_range_match(&whitelist_val, remote)) {
+ return KNOT_EDENIED;
+ }
+
+ /*
+ * Store the provided remote address.
+ */
+ int ret = proxyv2_addr_store(pkt, pkt_len, new_remote);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ /*
+ * Re-parse the query message using the data in the
+ * packet following the PROXY v2 payload. And replace the original
+ * query with the decapsulated one.
+ */
+ knot_pkt_t *q = knot_pkt_new(pkt + offset, pkt_len - offset, &(*query)->mm);
+ if (q == NULL) {
+ return KNOT_ENOMEM;
+ }
+ knot_pkt_free(*query);
+ *query = q;
+
+ return knot_pkt_parse(q, 0);
+}
diff --git a/src/knot/server/proxyv2.h b/src/knot/server/proxyv2.h
new file mode 100644
index 0000000..5cb1251
--- /dev/null
+++ b/src/knot/server/proxyv2.h
@@ -0,0 +1,23 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "libknot/packet/pkt.h"
+
+int proxyv2_header_strip(knot_pkt_t **query,
+ const struct sockaddr_storage *remote,
+ struct sockaddr_storage *new_remote);
diff --git a/src/knot/server/server.c b/src/knot/server/server.c
new file mode 100644
index 0000000..684526d
--- /dev/null
+++ b/src/knot/server/server.c
@@ -0,0 +1,1335 @@
+/* Copyright (C) 2023 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#define __APPLE_USE_RFC_3542
+
+#include <assert.h>
+#include <sys/types.h> // OpenBSD
+#include <netinet/tcp.h> // TCP_FASTOPEN
+#include <sys/resource.h>
+
+#include "libknot/libknot.h"
+#include "libknot/yparser/ypschema.h"
+#include "libknot/xdp.h"
+#if defined ENABLE_XDP && ENABLE_QUIC
+#include "libknot/xdp/quic.h"
+#endif // ENABLE_XDP && ENABLE_QUIC
+#include "knot/common/log.h"
+#include "knot/common/stats.h"
+#include "knot/common/systemd.h"
+#include "knot/common/unreachable.h"
+#include "knot/conf/confio.h"
+#include "knot/conf/migration.h"
+#include "knot/conf/module.h"
+#include "knot/dnssec/kasp/kasp_db.h"
+#include "knot/journal/journal_basic.h"
+#include "knot/server/server.h"
+#include "knot/server/udp-handler.h"
+#include "knot/server/tcp-handler.h"
+#include "knot/zone/timers.h"
+#include "knot/zone/zonedb-load.h"
+#include "knot/worker/pool.h"
+#include "contrib/conn_pool.h"
+#include "contrib/net.h"
+#include "contrib/openbsd/strlcat.h"
+#include "contrib/os.h"
+#include "contrib/sockaddr.h"
+#include "contrib/trim.h"
+
+#ifdef ENABLE_XDP
+#include <net/if.h>
+#endif
+
+#ifdef SO_ATTACH_REUSEPORT_CBPF
+#include <linux/filter.h>
+#endif
+
+/*! \brief Minimal send/receive buffer sizes. */
+enum {
+ UDP_MIN_RCVSIZE = 4096,
+ UDP_MIN_SNDSIZE = 4096,
+ TCP_MIN_RCVSIZE = 4096,
+ TCP_MIN_SNDSIZE = sizeof(uint16_t) + UINT16_MAX
+};
+
+/*! \brief Unbind interface and clear the structure. */
+static void server_deinit_iface(iface_t *iface, bool dealloc)
+{
+ assert(iface);
+
+ /* Free UDP handler. */
+ if (iface->fd_udp != NULL) {
+ for (int i = 0; i < iface->fd_udp_count; i++) {
+ if (iface->fd_udp[i] > -1) {
+ close(iface->fd_udp[i]);
+ }
+ }
+ free(iface->fd_udp);
+ }
+
+ for (int i = 0; i < iface->fd_xdp_count; i++) {
+#ifdef ENABLE_XDP
+ knot_xdp_deinit(iface->xdp_sockets[i]);
+#else
+ assert(0);
+#endif
+ }
+ free(iface->fd_xdp);
+ free(iface->xdp_sockets);
+
+ /* Free TCP handler. */
+ if (iface->fd_tcp != NULL) {
+ for (int i = 0; i < iface->fd_tcp_count; i++) {
+ if (iface->fd_tcp[i] > -1) {
+ close(iface->fd_tcp[i]);
+ }
+ }
+ free(iface->fd_tcp);
+ }
+
+ if (dealloc) {
+ free(iface);
+ }
+}
+
+/*! \brief Deinit server interface list. */
+static void server_deinit_iface_list(iface_t *ifaces, size_t n)
+{
+ if (ifaces != NULL) {
+ for (size_t i = 0; i < n; i++) {
+ server_deinit_iface(ifaces + i, false);
+ }
+ free(ifaces);
+ }
+}
+
+/*!
+ * \brief Attach SO_REUSEPORT socket filter for perfect CPU locality.
+ *
+ * \param sock Socket where to attach the CBPF filter to.
+ * \param sock_count Number of sockets.
+ */
+static bool server_attach_reuseport_bpf(const int sock, const int sock_count)
+{
+#ifdef SO_ATTACH_REUSEPORT_CBPF
+ struct sock_filter code[] = {
+ /* A = raw_smp_processor_id(). */
+ { BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU },
+ /* Adjust the CPUID to socket group size. */
+ { BPF_ALU | BPF_MOD | BPF_K, 0, 0, sock_count },
+ /* Return A. */
+ { BPF_RET | BPF_A, 0, 0, 0 },
+ };
+
+ struct sock_fprog prog = { 0 };
+ prog.len = sizeof(code) / sizeof(*code);
+ prog.filter = code;
+
+ return setsockopt(sock, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &prog, sizeof(prog)) == 0;
+#else
+ return true;
+#endif
+}
+
+/*! \brief Set lower bound for socket option. */
+static bool setsockopt_min(int sock, int option, int min)
+{
+ int value = 0;
+ socklen_t len = sizeof(value);
+
+ if (getsockopt(sock, SOL_SOCKET, option, &value, &len) != 0) {
+ return false;
+ }
+
+ assert(len == sizeof(value));
+ if (value >= min) {
+ return true;
+ }
+
+ return setsockopt(sock, SOL_SOCKET, option, &min, sizeof(min)) == 0;
+}
+
+/*!
+ * \brief Enlarge send/receive buffers.
+ */
+static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize)
+{
+ return setsockopt_min(sock, SO_RCVBUF, min_recvsize) &&
+ setsockopt_min(sock, SO_SNDBUF, min_sndsize);
+}
+
+/*!
+ * \brief Enable source packet information retrieval.
+ */
+static bool enable_pktinfo(int sock, int family)
+{
+ int level = 0;
+ int option = 0;
+
+ switch (family) {
+ case AF_INET:
+ level = IPPROTO_IP;
+#if defined(IP_PKTINFO)
+ option = IP_PKTINFO; /* Linux */
+#elif defined(IP_RECVDSTADDR)
+ option = IP_RECVDSTADDR; /* BSD */
+#else
+ return false;
+#endif
+ break;
+ case AF_INET6:
+ level = IPPROTO_IPV6;
+ option = IPV6_RECVPKTINFO;
+ break;
+ default:
+ return false;
+ }
+
+ const int on = 1;
+ return setsockopt(sock, level, option, &on, sizeof(on)) == 0;
+}
+
+/*!
+ * Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets
+ * ignore PMTU information and send packets with DF=0.
+ * Fragmentation is allowed if and only if the packet
+ * size exceeds the outgoing interface MTU or the packet
+ * encounters smaller MTU link in network.
+ * This mitigates DNS fragmentation attacks by preventing
+ * forged PMTU information.
+ * FreeBSD already has same semantics without setting
+ * the option.
+ */
+static int disable_pmtudisc(int sock, int family)
+{
+#if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
+ if (family == AF_INET) {
+ int action_omit = IP_PMTUDISC_OMIT;
+ if (setsockopt(sock, IPPROTO_IP, IP_MTU_DISCOVER, &action_omit,
+ sizeof(action_omit)) != 0) {
+ return knot_map_errno();
+ }
+ }
+#endif
+ return KNOT_EOK;
+}
+
+static iface_t *server_init_xdp_iface(struct sockaddr_storage *addr, bool route_check,
+ bool udp, bool tcp, uint16_t quic, unsigned *thread_id_start)
+{
+#ifndef ENABLE_XDP
+ assert(0);
+ return NULL;
+#else
+ conf_xdp_iface_t iface;
+ int ret = conf_xdp_iface(addr, &iface);
+ if (ret != KNOT_EOK) {
+ log_error("failed to initialize XDP interface (%s)",
+ knot_strerror(ret));
+ return NULL;
+ }
+
+ iface_t *new_if = calloc(1, sizeof(*new_if));
+ if (new_if == NULL) {
+ log_error("failed to initialize XDP interface");
+ return NULL;
+ }
+ memcpy(&new_if->addr, addr, sizeof(*addr));
+
+ new_if->fd_xdp = calloc(iface.queues, sizeof(int));
+ new_if->xdp_sockets = calloc(iface.queues, sizeof(*new_if->xdp_sockets));
+ if (new_if->fd_xdp == NULL || new_if->xdp_sockets == NULL) {
+ log_error("failed to initialize XDP interface");
+ server_deinit_iface(new_if, true);
+ return NULL;
+ }
+ new_if->xdp_first_thread_id = *thread_id_start;
+ *thread_id_start += iface.queues;
+
+ knot_xdp_filter_flag_t xdp_flags = udp ? KNOT_XDP_FILTER_UDP : 0;
+ if (tcp) {
+ xdp_flags |= KNOT_XDP_FILTER_TCP;
+ }
+ if (quic > 0) {
+ xdp_flags |= KNOT_XDP_FILTER_QUIC;
+ }
+ if (route_check) {
+ xdp_flags |= KNOT_XDP_FILTER_ROUTE;
+ }
+
+ for (int i = 0; i < iface.queues; i++) {
+ knot_xdp_load_bpf_t mode =
+ (i == 0 ? KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER);
+ ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i,
+ xdp_flags, iface.port, quic, mode, NULL);
+ if (ret == -EBUSY && i == 0) {
+ log_notice("XDP interface %s@%u is busy, retrying initialization",
+ iface.name, iface.port);
+ ret = knot_xdp_init(new_if->xdp_sockets + i, iface.name, i,
+ xdp_flags, iface.port, quic,
+ KNOT_XDP_LOAD_BPF_ALWAYS_UNLOAD, NULL);
+ }
+ if (ret != KNOT_EOK) {
+ log_warning("failed to initialize XDP interface %s@%u, queue %d (%s)",
+ iface.name, iface.port, i, knot_strerror(ret));
+ server_deinit_iface(new_if, true);
+ new_if = NULL;
+ break;
+ }
+ new_if->fd_xdp[i] = knot_xdp_socket_fd(new_if->xdp_sockets[i]);
+ new_if->fd_xdp_count++;
+ }
+
+ if (ret == KNOT_EOK) {
+ char msg[128];
+ (void)snprintf(msg, sizeof(msg), "initialized XDP interface %s", iface.name);
+ if (udp || tcp) {
+ char buf[32] = "";
+ (void)snprintf(buf, sizeof(buf), ", %s%s%s port %u",
+ (udp ? "UDP" : ""),
+ (udp && tcp ? "/" : ""),
+ (tcp ? "TCP" : ""),
+ iface.port);
+ strlcat(msg, buf, sizeof(msg));
+ }
+ if (quic) {
+ char buf[32] = "";
+ (void)snprintf(buf, sizeof(buf), ", QUIC port %u", quic);
+ strlcat(msg, buf, sizeof(msg));
+ }
+
+ knot_xdp_mode_t mode = knot_eth_xdp_mode(if_nametoindex(iface.name));
+ log_info("%s, queues %d, %s mode%s", msg, iface.queues,
+ (mode == KNOT_XDP_MODE_FULL ? "native" : "emulated"),
+ route_check ? ", route check" : "");
+ }
+
+ return new_if;
+#endif
+}
+
+/*!
+ * \brief Create and initialize new interface.
+ *
+ * Both TCP and UDP sockets will be created for the interface.
+ *
+ * \param addr Socket address.
+ * \param udp_thread_count Number of created UDP workers.
+ * \param tcp_thread_count Number of created TCP workers.
+ * \param tcp_reuseport Indication if reuseport on TCP is enabled.
+ * \param socket_affinity Indication if CBPF should be attached.
+ *
+ * \retval Pointer to a new initialized interface.
+ * \retval NULL if error.
+ */
+static iface_t *server_init_iface(struct sockaddr_storage *addr,
+ int udp_thread_count, int tcp_thread_count,
+ bool tcp_reuseport, bool socket_affinity)
+{
+ iface_t *new_if = calloc(1, sizeof(*new_if));
+ if (new_if == NULL) {
+ log_error("failed to initialize interface");
+ return NULL;
+ }
+ memcpy(&new_if->addr, addr, sizeof(*addr));
+
+ /* Convert to string address format. */
+ char addr_str[SOCKADDR_STRLEN] = { 0 };
+ sockaddr_tostr(addr_str, sizeof(addr_str), addr);
+
+ int udp_socket_count = 1;
+ int udp_bind_flags = 0;
+ int tcp_socket_count = 1;
+ int tcp_bind_flags = 0;
+
+#ifdef ENABLE_REUSEPORT
+ udp_socket_count = udp_thread_count;
+ udp_bind_flags |= NET_BIND_MULTIPLE;
+
+ if (tcp_reuseport) {
+ tcp_socket_count = tcp_thread_count;
+ tcp_bind_flags |= NET_BIND_MULTIPLE;
+ }
+#endif
+
+ new_if->fd_udp = malloc(udp_socket_count * sizeof(int));
+ new_if->fd_tcp = malloc(tcp_socket_count * sizeof(int));
+ if (new_if->fd_udp == NULL || new_if->fd_tcp == NULL) {
+ log_error("failed to initialize interface");
+ server_deinit_iface(new_if, true);
+ return NULL;
+ }
+
+ const mode_t unix_mode = S_IWUSR | S_IWGRP | S_IWOTH;
+
+ bool warn_bind = true;
+ bool warn_cbpf = true;
+ bool warn_bufsize = true;
+ bool warn_pktinfo = true;
+ bool warn_flag_misc = true;
+
+ /* Create bound UDP sockets. */
+ for (int i = 0; i < udp_socket_count; i++) {
+ int sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode);
+ if (sock == KNOT_EADDRNOTAVAIL) {
+ udp_bind_flags |= NET_BIND_NONLOCAL;
+ sock = net_bound_socket(SOCK_DGRAM, addr, udp_bind_flags, unix_mode);
+ if (sock >= 0 && warn_bind) {
+ log_warning("address %s UDP bound, but required nonlocal bind", addr_str);
+ warn_bind = false;
+ }
+ }
+
+ if (sock < 0) {
+ log_error("cannot bind address %s UDP (%s)", addr_str,
+ knot_strerror(sock));
+ server_deinit_iface(new_if, true);
+ return NULL;
+ }
+
+ if ((udp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) {
+ if (!server_attach_reuseport_bpf(sock, udp_socket_count) &&
+ warn_cbpf) {
+ log_warning("cannot ensure optimal CPU locality for UDP");
+ warn_cbpf = false;
+ }
+ }
+
+ if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE) &&
+ warn_bufsize) {
+ log_warning("failed to set network buffer sizes for UDP");
+ warn_bufsize = false;
+ }
+
+ if (sockaddr_is_any(addr) && !enable_pktinfo(sock, addr->ss_family) &&
+ warn_pktinfo) {
+ log_warning("failed to enable received packet information retrieval");
+ warn_pktinfo = false;
+ }
+
+ int ret = disable_pmtudisc(sock, addr->ss_family);
+ if (ret != KNOT_EOK && warn_flag_misc) {
+ log_warning("failed to disable Path MTU discovery for IPv4/UDP (%s)",
+ knot_strerror(ret));
+ warn_flag_misc = false;
+ }
+
+ new_if->fd_udp[new_if->fd_udp_count] = sock;
+ new_if->fd_udp_count += 1;
+ }
+
+ warn_bind = true;
+ warn_cbpf = true;
+ warn_bufsize = true;
+ warn_flag_misc = true;
+
+ /* Create bound TCP sockets. */
+ for (int i = 0; i < tcp_socket_count; i++) {
+ int sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode);
+ if (sock == KNOT_EADDRNOTAVAIL) {
+ tcp_bind_flags |= NET_BIND_NONLOCAL;
+ sock = net_bound_socket(SOCK_STREAM, addr, tcp_bind_flags, unix_mode);
+ if (sock >= 0 && warn_bind) {
+ log_warning("address %s TCP bound, but required nonlocal bind", addr_str);
+ warn_bind = false;
+ }
+ }
+
+ if (sock < 0) {
+ log_error("cannot bind address %s TCP (%s)", addr_str,
+ knot_strerror(sock));
+ server_deinit_iface(new_if, true);
+ return NULL;
+ }
+
+ if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE) &&
+ warn_bufsize) {
+ log_warning("failed to set network buffer sizes for TCP");
+ warn_bufsize = false;
+ }
+
+ new_if->fd_tcp[new_if->fd_tcp_count] = sock;
+ new_if->fd_tcp_count += 1;
+
+ /* Listen for incoming connections. */
+ int ret = listen(sock, TCP_BACKLOG_SIZE);
+ if (ret < 0) {
+ log_error("failed to listen on TCP interface %s", addr_str);
+ server_deinit_iface(new_if, true);
+ return NULL;
+ }
+
+ if ((tcp_bind_flags & NET_BIND_MULTIPLE) && socket_affinity) {
+ if (!server_attach_reuseport_bpf(sock, tcp_socket_count) &&
+ warn_cbpf) {
+ log_warning("cannot ensure optimal CPU locality for TCP");
+ warn_cbpf = false;
+ }
+ }
+
+ /* Try to enable TCP Fast Open. */
+ ret = net_bound_tfo(sock, TCP_BACKLOG_SIZE);
+ if (ret != KNOT_EOK && ret != KNOT_ENOTSUP && warn_flag_misc) {
+ log_warning("failed to enable TCP Fast Open on %s (%s)",
+ addr_str, knot_strerror(ret));
+ warn_flag_misc = false;
+ }
+ }
+
+ return new_if;
+}
+
+static void log_sock_conf(conf_t *conf)
+{
+ char buf[128] = "";
+#if defined(ENABLE_REUSEPORT)
+ strlcat(buf, "UDP", sizeof(buf));
+ if (conf->cache.srv_tcp_reuseport) {
+ strlcat(buf, "/TCP", sizeof(buf));
+ }
+ strlcat(buf, " reuseport", sizeof(buf));
+ if (conf->cache.srv_socket_affinity) {
+ strlcat(buf, ", socket affinity", sizeof(buf));
+ }
+#endif
+#if defined(TCP_FASTOPEN)
+ if (buf[0] != '\0') {
+ strlcat(buf, ", ", sizeof(buf));
+ }
+ strlcat(buf, "incoming", sizeof(buf));
+ if (conf->cache.srv_tcp_fastopen) {
+ strlcat(buf, "/outgoing", sizeof(buf));
+ }
+ strlcat(buf, " TCP Fast Open", sizeof(buf));
+#endif
+ if (buf[0] != '\0') {
+ log_info("using %s", buf);
+ }
+}
+
+/*! \brief Initialize bound sockets according to configuration. */
+static int configure_sockets(conf_t *conf, server_t *s)
+{
+ if (s->state & ServerRunning) {
+ return KNOT_EOK;
+ }
+
+ conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN);
+ conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN);
+ conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR);
+
+ if (listen_val.code == KNOT_EOK) {
+ log_sock_conf(conf);
+ } else if (lisxdp_val.code != KNOT_EOK) {
+ log_warning("no network interface configured");
+ return KNOT_EOK;
+ }
+
+#ifdef ENABLE_XDP
+ if (lisxdp_val.code == KNOT_EOK && !linux_at_least(5, 11)) {
+ struct rlimit min_limit = { RLIM_INFINITY, RLIM_INFINITY };
+ struct rlimit cur_limit = { 0 };
+ if (getrlimit(RLIMIT_MEMLOCK, &cur_limit) != 0 ||
+ cur_limit.rlim_cur != min_limit.rlim_cur ||
+ cur_limit.rlim_max != min_limit.rlim_max) {
+ int ret = setrlimit(RLIMIT_MEMLOCK, &min_limit);
+ if (ret != 0) {
+ log_error("failed to increase RLIMIT_MEMLOCK (%s)",
+ knot_strerror(errno));
+ return KNOT_ESYSTEM;
+ }
+ }
+ }
+#endif
+
+ size_t real_nifs = 0;
+ size_t nifs = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val);
+ iface_t *newlist = calloc(nifs, sizeof(*newlist));
+ if (newlist == NULL) {
+ log_error("failed to allocate memory for network sockets");
+ return KNOT_ENOMEM;
+ }
+
+ /* Normal UDP and TCP sockets. */
+ unsigned size_udp = s->handlers[IO_UDP].handler.unit->size;
+ unsigned size_tcp = s->handlers[IO_TCP].handler.unit->size;
+ bool tcp_reuseport = conf->cache.srv_tcp_reuseport;
+ bool socket_affinity = conf->cache.srv_socket_affinity;
+ char *rundir = conf_abs_path(&rundir_val, NULL);
+ while (listen_val.code == KNOT_EOK) {
+ struct sockaddr_storage addr = conf_addr(&listen_val, rundir);
+ char addr_str[SOCKADDR_STRLEN] = { 0 };
+ sockaddr_tostr(addr_str, sizeof(addr_str), &addr);
+ log_info("binding to interface %s", addr_str);
+
+ iface_t *new_if = server_init_iface(&addr, size_udp, size_tcp,
+ tcp_reuseport, socket_affinity);
+ if (new_if == NULL) {
+ server_deinit_iface_list(newlist, nifs);
+ free(rundir);
+ return KNOT_ERROR;
+ }
+ memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist));
+ free(new_if);
+
+ conf_val_next(&listen_val);
+ }
+ free(rundir);
+
+ /* XDP sockets. */
+ bool xdp_udp = conf->cache.xdp_udp;
+ bool xdp_tcp = conf->cache.xdp_tcp;
+ uint16_t xdp_quic = conf->cache.xdp_quic;
+ bool route_check = conf->cache.xdp_route_check;
+ unsigned thread_id = s->handlers[IO_UDP].handler.unit->size +
+ s->handlers[IO_TCP].handler.unit->size;
+ while (lisxdp_val.code == KNOT_EOK) {
+ struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL);
+ char addr_str[SOCKADDR_STRLEN] = { 0 };
+ sockaddr_tostr(addr_str, sizeof(addr_str), &addr);
+ log_info("binding to XDP interface %s", addr_str);
+
+ iface_t *new_if = server_init_xdp_iface(&addr, route_check, xdp_udp,
+ xdp_tcp, xdp_quic, &thread_id);
+ if (new_if == NULL) {
+ server_deinit_iface_list(newlist, nifs);
+ return KNOT_ERROR;
+ }
+ memcpy(&newlist[real_nifs++], new_if, sizeof(*newlist));
+ free(new_if);
+
+ conf_val_next(&lisxdp_val);
+ }
+ assert(real_nifs <= nifs);
+ nifs = real_nifs;
+
+#if defined ENABLE_XDP && ENABLE_QUIC
+ if (xdp_quic > 0) {
+ char *tls_cert = conf_tls(conf, C_CERT_FILE);
+ char *tls_key = conf_tls(conf, C_KEY_FILE);
+ if (tls_cert == NULL) {
+ log_notice("QUIC, no server certificate configured, using one-time one");
+ }
+ s->quic_creds = knot_xquic_init_creds(true, tls_cert, tls_key);
+ free(tls_cert);
+ free(tls_key);
+ if (s->quic_creds == NULL) {
+ log_error("QUIC, failed to initialize server credentials");
+ server_deinit_iface_list(newlist, nifs);
+ return KNOT_ERROR;
+ }
+ }
+#endif // ENABLE_XDP && ENABLE_QUIC
+
+ /* Publish new list. */
+ s->ifaces = newlist;
+ s->n_ifaces = nifs;
+
+ /* Assign thread identifiers unique per all handlers. */
+ unsigned thread_count = 0;
+ for (unsigned proto = IO_UDP; proto <= IO_XDP; ++proto) {
+ dt_unit_t *tu = s->handlers[proto].handler.unit;
+ for (unsigned i = 0; tu != NULL && i < tu->size; ++i) {
+ s->handlers[proto].handler.thread_id[i] = thread_count++;
+ }
+ }
+
+ return KNOT_EOK;
+}
+
+int server_init(server_t *server, int bg_workers)
+{
+ if (server == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ /* Clear the structure. */
+ memset(server, 0, sizeof(server_t));
+
+ /* Initialize event scheduler. */
+ if (evsched_init(&server->sched, server) != KNOT_EOK) {
+ return KNOT_ENOMEM;
+ }
+
+ server->workers = worker_pool_create(bg_workers);
+ if (server->workers == NULL) {
+ evsched_deinit(&server->sched);
+ return KNOT_ENOMEM;
+ }
+
+ int ret = catalog_update_init(&server->catalog_upd);
+ if (ret != KNOT_EOK) {
+ worker_pool_destroy(server->workers);
+ evsched_deinit(&server->sched);
+ return ret;
+ }
+
+ zone_backups_init(&server->backup_ctxs);
+
+ char *catalog_dir = conf_db(conf(), C_CATALOG_DB);
+ conf_val_t catalog_size = conf_db_param(conf(), C_CATALOG_DB_MAX_SIZE);
+ catalog_init(&server->catalog, catalog_dir, conf_int(&catalog_size));
+ free(catalog_dir);
+ conf()->catalog = &server->catalog;
+
+ char *journal_dir = conf_db(conf(), C_JOURNAL_DB);
+ conf_val_t journal_size = conf_db_param(conf(), C_JOURNAL_DB_MAX_SIZE);
+ conf_val_t journal_mode = conf_db_param(conf(), C_JOURNAL_DB_MODE);
+ knot_lmdb_init(&server->journaldb, journal_dir, conf_int(&journal_size), journal_env_flags(conf_opt(&journal_mode), false), NULL);
+ free(journal_dir);
+
+ kasp_db_ensure_init(&server->kaspdb, conf());
+
+ char *timer_dir = conf_db(conf(), C_TIMER_DB);
+ conf_val_t timer_size = conf_db_param(conf(), C_TIMER_DB_MAX_SIZE);
+ knot_lmdb_init(&server->timerdb, timer_dir, conf_int(&timer_size), 0, NULL);
+ free(timer_dir);
+
+ return KNOT_EOK;
+}
+
+void server_deinit(server_t *server)
+{
+ if (server == NULL) {
+ return;
+ }
+
+ zone_backups_deinit(&server->backup_ctxs);
+
+ /* Save zone timers. */
+ if (server->zone_db != NULL) {
+ log_info("updating persistent timer DB");
+ int ret = zone_timers_write_all(&server->timerdb, server->zone_db);
+ if (ret != KNOT_EOK) {
+ log_warning("failed to update persistent timer DB (%s)",
+ knot_strerror(ret));
+ }
+ }
+
+ /* Free remaining interfaces. */
+ server_deinit_iface_list(server->ifaces, server->n_ifaces);
+
+ /* Free threads and event handlers. */
+ worker_pool_destroy(server->workers);
+
+ /* Free zone database. */
+ knot_zonedb_deep_free(&server->zone_db, true);
+
+ /* Free remaining events. */
+ evsched_deinit(&server->sched);
+
+ /* Free catalog zone context. */
+ catalog_update_clear(&server->catalog_upd);
+ catalog_update_deinit(&server->catalog_upd);
+ catalog_deinit(&server->catalog);
+
+ /* Close persistent timers DB. */
+ knot_lmdb_deinit(&server->timerdb);
+
+ /* Close kasp_db. */
+ knot_lmdb_deinit(&server->kaspdb);
+
+ /* Close journal database if open. */
+ knot_lmdb_deinit(&server->journaldb);
+
+ /* Close and deinit connection pool. */
+ conn_pool_deinit(global_conn_pool);
+ global_conn_pool = NULL;
+ knot_unreachables_deinit(&global_unreachables);
+
+#if defined ENABLE_XDP && ENABLE_QUIC
+ knot_xquic_free_creds(server->quic_creds);
+#endif // ENABLE_XDP && ENABLE_QUIC
+}
+
+static int server_init_handler(server_t *server, int index, int thread_count,
+ runnable_t runnable, runnable_t destructor)
+{
+ /* Initialize */
+ iohandler_t *h = &server->handlers[index].handler;
+ memset(h, 0, sizeof(iohandler_t));
+ h->server = server;
+ h->unit = dt_create(thread_count, runnable, destructor, h);
+ if (h->unit == NULL) {
+ return KNOT_ENOMEM;
+ }
+
+ h->thread_state = calloc(thread_count, sizeof(unsigned));
+ if (h->thread_state == NULL) {
+ dt_delete(&h->unit);
+ return KNOT_ENOMEM;
+ }
+
+ h->thread_id = calloc(thread_count, sizeof(unsigned));
+ if (h->thread_id == NULL) {
+ free(h->thread_state);
+ dt_delete(&h->unit);
+ return KNOT_ENOMEM;
+ }
+
+ return KNOT_EOK;
+}
+
+static void server_free_handler(iohandler_t *h)
+{
+ if (h == NULL || h->server == NULL) {
+ return;
+ }
+
+ /* Wait for threads to finish */
+ if (h->unit) {
+ dt_stop(h->unit);
+ dt_join(h->unit);
+ }
+
+ /* Destroy worker context. */
+ dt_delete(&h->unit);
+ free(h->thread_state);
+ free(h->thread_id);
+}
+
+static void worker_wait_cb(worker_pool_t *pool)
+{
+ systemd_zone_load_timeout_notify();
+
+ static uint64_t last_ns = 0;
+ struct timespec now = time_now();
+ uint64_t now_ns = 1000000000 * now.tv_sec + now.tv_nsec;
+ /* Too frequent worker_pool_status() call with many zones is expensive. */
+ if (now_ns - last_ns > 1000000000) {
+ int running, queued;
+ worker_pool_status(pool, true, &running, &queued);
+ systemd_tasks_status_notify(running + queued);
+ last_ns = now_ns;
+ }
+}
+
+int server_start(server_t *server, bool async)
+{
+ if (server == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ /* Start workers. */
+ worker_pool_start(server->workers);
+
+ /* Wait for enqueued events if not asynchronous. */
+ if (!async) {
+ worker_pool_wait_cb(server->workers, worker_wait_cb);
+ systemd_tasks_status_notify(0);
+ }
+
+ /* Start evsched handler. */
+ evsched_start(&server->sched);
+
+ /* Start I/O handlers. */
+ server->state |= ServerRunning;
+ for (int proto = IO_UDP; proto <= IO_XDP; ++proto) {
+ if (server->handlers[proto].size > 0) {
+ int ret = dt_start(server->handlers[proto].handler.unit);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ }
+ }
+
+ return KNOT_EOK;
+}
+
+void server_wait(server_t *server)
+{
+ if (server == NULL) {
+ return;
+ }
+
+ evsched_join(&server->sched);
+ worker_pool_join(server->workers);
+
+ for (int proto = IO_UDP; proto <= IO_XDP; ++proto) {
+ if (server->handlers[proto].size > 0) {
+ server_free_handler(&server->handlers[proto].handler);
+ }
+ }
+}
+
+static int reload_conf(conf_t *new_conf)
+{
+ yp_schema_purge_dynamic(new_conf->schema);
+
+ /* Re-load common modules. */
+ int ret = conf_mod_load_common(new_conf);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ /* Re-import config file if specified. */
+ const char *filename = conf()->filename;
+ if (filename != NULL) {
+ log_info("reloading configuration file '%s'", filename);
+
+ /* Import the configuration file. */
+ ret = conf_import(new_conf, filename, true, false);
+ if (ret != KNOT_EOK) {
+ log_error("failed to load configuration file (%s)",
+ knot_strerror(ret));
+ return ret;
+ }
+ } else {
+ log_info("reloading configuration database '%s'",
+ knot_db_lmdb_get_path(new_conf->db));
+
+ /* Re-load extra modules. */
+ for (conf_iter_t iter = conf_iter(new_conf, C_MODULE);
+ iter.code == KNOT_EOK; conf_iter_next(new_conf, &iter)) {
+ conf_val_t id = conf_iter_id(new_conf, &iter);
+ conf_val_t file = conf_id_get(new_conf, C_MODULE, C_FILE, &id);
+ ret = conf_mod_load_extra(new_conf, conf_str(&id), conf_str(&file),
+ MOD_EXPLICIT);
+ if (ret != KNOT_EOK) {
+ conf_iter_finish(new_conf, &iter);
+ return ret;
+ }
+ }
+ }
+
+ conf_mod_load_purge(new_conf, false);
+
+ // Migrate from old schema.
+ ret = conf_migrate(new_conf);
+ if (ret != KNOT_EOK) {
+ log_error("failed to migrate configuration (%s)", knot_strerror(ret));
+ }
+
+ return KNOT_EOK;
+}
+
+/*! \brief Check if parameter listen(-xdp) has been changed since knotd started. */
+static bool listen_changed(conf_t *conf, server_t *server)
+{
+ assert(server->ifaces);
+
+ conf_val_t listen_val = conf_get(conf, C_SRV, C_LISTEN);
+ conf_val_t lisxdp_val = conf_get(conf, C_XDP, C_LISTEN);
+ size_t new_count = conf_val_count(&listen_val) + conf_val_count(&lisxdp_val);
+ size_t old_count = server->n_ifaces;
+ if (new_count != old_count) {
+ return true;
+ }
+
+ conf_val_t rundir_val = conf_get(conf, C_SRV, C_RUNDIR);
+ char *rundir = conf_abs_path(&rundir_val, NULL);
+ size_t matches = 0;
+
+ /* Find matching interfaces. */
+ while (listen_val.code == KNOT_EOK) {
+ struct sockaddr_storage addr = conf_addr(&listen_val, rundir);
+ bool found = false;
+ for (size_t i = 0; i < server->n_ifaces; i++) {
+ if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) {
+ matches++;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ break;
+ }
+ conf_val_next(&listen_val);
+ }
+ free(rundir);
+
+ while (lisxdp_val.code == KNOT_EOK) {
+ struct sockaddr_storage addr = conf_addr(&lisxdp_val, NULL);
+ bool found = false;
+ for (size_t i = 0; i < server->n_ifaces; i++) {
+ if (sockaddr_cmp(&addr, &server->ifaces[i].addr, false) == 0) {
+ matches++;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ break;
+ }
+ conf_val_next(&lisxdp_val);
+ }
+
+ return matches != old_count;
+}
+
+/*! \brief Log warnings if config change requires a restart. */
+static void warn_server_reconfigure(conf_t *conf, server_t *server)
+{
+ const char *msg = "changes of %s require restart to take effect";
+
+ static bool warn_tcp_reuseport = true;
+ static bool warn_socket_affinity = true;
+ static bool warn_udp = true;
+ static bool warn_tcp = true;
+ static bool warn_bg = true;
+ static bool warn_listen = true;
+ static bool warn_xdp_udp = true;
+ static bool warn_xdp_tcp = true;
+ static bool warn_xdp_quic = true;
+ static bool warn_route_check = true;
+ static bool warn_rmt_pool_limit = true;
+
+ if (warn_tcp_reuseport && conf->cache.srv_tcp_reuseport != conf_get_bool(conf, C_SRV, C_TCP_REUSEPORT)) {
+ log_warning(msg, &C_TCP_REUSEPORT[1]);
+ warn_tcp_reuseport = false;
+ }
+
+ if (warn_socket_affinity && conf->cache.srv_socket_affinity != conf_get_bool(conf, C_SRV, C_SOCKET_AFFINITY)) {
+ log_warning(msg, &C_SOCKET_AFFINITY[1]);
+ warn_socket_affinity = false;
+ }
+
+ if (warn_udp && server->handlers[IO_UDP].size != conf_udp_threads(conf)) {
+ log_warning(msg, &C_UDP_WORKERS[1]);
+ warn_udp = false;
+ }
+
+ if (warn_tcp && server->handlers[IO_TCP].size != conf_tcp_threads(conf)) {
+ log_warning(msg, &C_TCP_WORKERS[1]);
+ warn_tcp = false;
+ }
+
+ if (warn_bg && conf->cache.srv_bg_threads != conf_bg_threads(conf)) {
+ log_warning(msg, &C_BG_WORKERS[1]);
+ warn_bg = false;
+ }
+
+ if (warn_listen && server->ifaces != NULL && listen_changed(conf, server)) {
+ log_warning(msg, "listen(-xdp)");
+ warn_listen = false;
+ }
+
+ if (warn_xdp_udp && conf->cache.xdp_udp != conf_get_bool(conf, C_XDP, C_UDP)) {
+ log_warning(msg, &C_UDP[1]);
+ warn_xdp_udp = false;
+ }
+
+ if (warn_xdp_tcp && conf->cache.xdp_tcp != conf_get_bool(conf, C_XDP, C_TCP)) {
+ log_warning(msg, &C_TCP[1]);
+ warn_xdp_tcp = false;
+ }
+
+ if (warn_xdp_quic && (bool)conf->cache.xdp_quic != conf_get_bool(conf, C_XDP, C_QUIC)) {
+ log_warning(msg, &C_QUIC[1]);
+ warn_xdp_quic = false;
+ }
+
+ if (warn_xdp_quic && conf->cache.xdp_quic > 0 &&
+ conf->cache.xdp_quic != conf_get_int(conf, C_XDP, C_QUIC_PORT)) {
+ log_warning(msg, &C_QUIC_PORT[1]);
+ warn_xdp_quic = false;
+ }
+
+ if (warn_route_check && conf->cache.xdp_route_check != conf_get_bool(conf, C_XDP, C_ROUTE_CHECK)) {
+ log_warning(msg, &C_ROUTE_CHECK[1]);
+ warn_route_check = false;
+ }
+
+ if (warn_rmt_pool_limit && global_conn_pool != NULL &&
+ global_conn_pool->capacity != conf_get_int(conf, C_SRV, C_RMT_POOL_LIMIT)) {
+ log_warning(msg, &C_RMT_POOL_LIMIT[1]);
+ warn_rmt_pool_limit = false;
+ }
+}
+
+int server_reload(server_t *server, reload_t mode)
+{
+ if (server == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ systemd_reloading_notify();
+
+ /* Check for no edit mode. */
+ if (conf()->io.txn != NULL) {
+ log_warning("reload aborted due to active configuration transaction");
+ systemd_ready_notify();
+ return KNOT_TXN_EEXISTS;
+ }
+
+ conf_t *new_conf = NULL;
+ int ret = conf_clone(&new_conf);
+ if (ret != KNOT_EOK) {
+ log_error("failed to initialize configuration (%s)",
+ knot_strerror(ret));
+ systemd_ready_notify();
+ return ret;
+ }
+
+ yp_flag_t flags = conf()->io.flags;
+ bool full = !(flags & CONF_IO_FACTIVE);
+ bool reuse_modules = !full && !(flags & CONF_IO_FRLD_MOD);
+
+ /* Reload configuration and modules if full reload or a module change. */
+ if (full || !reuse_modules) {
+ ret = reload_conf(new_conf);
+ if (ret != KNOT_EOK) {
+ conf_free(new_conf);
+ systemd_ready_notify();
+ return ret;
+ }
+
+ conf_activate_modules(new_conf, server, NULL, new_conf->query_modules,
+ &new_conf->query_plan);
+ }
+
+ conf_update_flag_t upd_flags = CONF_UPD_FNOFREE;
+ if (!full) {
+ upd_flags |= CONF_UPD_FCONFIO;
+ }
+ if (reuse_modules) {
+ upd_flags |= CONF_UPD_FMODULES;
+ }
+
+ /* Update to the new config. */
+ conf_t *old_conf = conf_update(new_conf, upd_flags);
+
+ /* Reload each component if full reload or a specific one if required. */
+ if (full || (flags & CONF_IO_FRLD_LOG)) {
+ log_reconfigure(conf());
+ }
+ if (full || (flags & CONF_IO_FRLD_SRV)) {
+ (void)server_reconfigure(conf(), server);
+ warn_server_reconfigure(conf(), server);
+ stats_reconfigure(conf(), server);
+ }
+ if (full || (flags & (CONF_IO_FRLD_ZONES | CONF_IO_FRLD_ZONE))) {
+ server_update_zones(conf(), server, mode);
+ }
+
+ /* Free old config needed for module unload in zone reload. */
+ conf_free(old_conf);
+
+ if (full) {
+ log_info("configuration reloaded");
+ } else {
+ // Reset confio reload context.
+ conf()->io.flags = YP_FNONE;
+ if (conf()->io.zones != NULL) {
+ trie_clear(conf()->io.zones);
+ }
+ }
+
+ systemd_ready_notify();
+
+ return KNOT_EOK;
+}
+
+void server_stop(server_t *server)
+{
+ log_info("stopping server");
+ systemd_stopping_notify();
+
+ /* Stop scheduler. */
+ evsched_stop(&server->sched);
+ /* Interrupt background workers. */
+ worker_pool_stop(server->workers);
+
+ /* Clear 'running' flag. */
+ server->state &= ~ServerRunning;
+}
+
+static int set_handler(server_t *server, int index, unsigned size, runnable_t run)
+{
+ /* Initialize I/O handlers. */
+ int ret = server_init_handler(server, index, size, run, NULL);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ server->handlers[index].size = size;
+
+ return KNOT_EOK;
+}
+
+static int configure_threads(conf_t *conf, server_t *server)
+{
+ int ret = set_handler(server, IO_UDP, conf->cache.srv_udp_threads, udp_master);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
+ if (conf->cache.srv_xdp_threads > 0) {
+ ret = set_handler(server, IO_XDP, conf->cache.srv_xdp_threads, udp_master);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+ }
+
+ return set_handler(server, IO_TCP, conf->cache.srv_tcp_threads, tcp_master);
+}
+
+static int reconfigure_journal_db(conf_t *conf, server_t *server)
+{
+ char *journal_dir = conf_db(conf, C_JOURNAL_DB);
+ conf_val_t journal_size = conf_db_param(conf, C_JOURNAL_DB_MAX_SIZE);
+ conf_val_t journal_mode = conf_db_param(conf, C_JOURNAL_DB_MODE);
+ int ret = knot_lmdb_reinit(&server->journaldb, journal_dir, conf_int(&journal_size),
+ journal_env_flags(conf_opt(&journal_mode), false));
+ if (ret != KNOT_EOK) {
+ log_warning("ignored reconfiguration of journal DB (%s)", knot_strerror(ret));
+ }
+ free(journal_dir);
+
+ return KNOT_EOK; // not "ret"
+}
+
+static int reconfigure_kasp_db(conf_t *conf, server_t *server)
+{
+ char *kasp_dir = conf_db(conf, C_KASP_DB);
+ conf_val_t kasp_size = conf_db_param(conf, C_KASP_DB_MAX_SIZE);
+ int ret = knot_lmdb_reinit(&server->kaspdb, kasp_dir, conf_int(&kasp_size), 0);
+ if (ret != KNOT_EOK) {
+ log_warning("ignored reconfiguration of KASP DB (%s)", knot_strerror(ret));
+ }
+ free(kasp_dir);
+
+ return KNOT_EOK; // not "ret"
+}
+
+static int reconfigure_timer_db(conf_t *conf, server_t *server)
+{
+ char *timer_dir = conf_db(conf, C_TIMER_DB);
+ conf_val_t timer_size = conf_db_param(conf, C_TIMER_DB_MAX_SIZE);
+ int ret = knot_lmdb_reconfigure(&server->timerdb, timer_dir, conf_int(&timer_size), 0);
+ free(timer_dir);
+ return ret;
+}
+
+static int reconfigure_remote_pool(conf_t *conf)
+{
+ conf_val_t val = conf_get(conf, C_SRV, C_RMT_POOL_LIMIT);
+ size_t limit = conf_int(&val);
+ val = conf_get(conf, C_SRV, C_RMT_POOL_TIMEOUT);
+ knot_timediff_t timeout = conf_int(&val);
+ if (global_conn_pool == NULL && limit > 0) {
+ conn_pool_t *new_pool = conn_pool_init(limit, timeout);
+ if (new_pool == NULL) {
+ return KNOT_ENOMEM;
+ }
+ global_conn_pool = new_pool;
+ } else {
+ (void)conn_pool_timeout(global_conn_pool, timeout);
+ }
+
+ val = conf_get(conf, C_SRV, C_RMT_RETRY_DELAY);
+ int delay_ms = conf_int(&val);
+ if (global_unreachables == NULL && delay_ms > 0) {
+ global_unreachables = knot_unreachables_init(delay_ms);
+ } else {
+ (void)knot_unreachables_ttl(global_unreachables, delay_ms);
+ }
+
+ return KNOT_EOK;
+}
+
+int server_reconfigure(conf_t *conf, server_t *server)
+{
+ if (conf == NULL || server == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ int ret;
+
+ /* First reconfiguration. */
+ if (!(server->state & ServerRunning)) {
+ log_info("Knot DNS %s starting", PACKAGE_VERSION);
+
+ size_t mapsize = conf->mapsize / (1024 * 1024);
+ if (conf->filename != NULL) {
+ log_info("loaded configuration file '%s', mapsize %zu MiB",
+ conf->filename, mapsize);
+ } else {
+ log_info("loaded configuration database '%s', mapsize %zu MiB",
+ knot_db_lmdb_get_path(conf->db), mapsize);
+ }
+
+ /* Configure server threads. */
+ if ((ret = configure_threads(conf, server)) != KNOT_EOK) {
+ log_error("failed to configure server threads (%s)",
+ knot_strerror(ret));
+ return ret;
+ }
+
+ /* Configure sockets. */
+ if ((ret = configure_sockets(conf, server)) != KNOT_EOK) {
+ return ret;
+ }
+
+ if (conf_lmdb_readers(conf) > CONF_MAX_DB_READERS) {
+ log_warning("config, exceeded number of database readers");
+ }
+ }
+
+ /* Reconfigure journal DB. */
+ if ((ret = reconfigure_journal_db(conf, server)) != KNOT_EOK) {
+ log_error("failed to reconfigure journal DB (%s)",
+ knot_strerror(ret));
+ }
+
+ /* Reconfigure KASP DB. */
+ if ((ret = reconfigure_kasp_db(conf, server)) != KNOT_EOK) {
+ log_error("failed to reconfigure KASP DB (%s)",
+ knot_strerror(ret));
+ }
+
+ /* Reconfigure Timer DB. */
+ if ((ret = reconfigure_timer_db(conf, server)) != KNOT_EOK) {
+ log_error("failed to reconfigure Timer DB (%s)",
+ knot_strerror(ret));
+ }
+
+ /* Reconfigure connection pool. */
+ if ((ret = reconfigure_remote_pool(conf)) != KNOT_EOK) {
+ log_error("failed to reconfigure remote pool (%s)",
+ knot_strerror(ret));
+ }
+
+ return KNOT_EOK;
+}
+
+void server_update_zones(conf_t *conf, server_t *server, reload_t mode)
+{
+ if (conf == NULL || server == NULL) {
+ return;
+ }
+
+ /* Prevent emitting of new zone events. */
+ if (server->zone_db) {
+ knot_zonedb_foreach(server->zone_db, zone_events_freeze);
+ }
+
+ /* Suspend adding events to worker pool queue, wait for queued events. */
+ evsched_pause(&server->sched);
+ worker_pool_wait(server->workers);
+
+ /* Reload zone database and free old zones. */
+ zonedb_reload(conf, server, mode);
+
+ /* Trim extra heap. */
+ mem_trim();
+
+ /* Resume processing events on new zones. */
+ evsched_resume(&server->sched);
+ if (server->zone_db) {
+ knot_zonedb_foreach(server->zone_db, zone_events_start);
+ }
+}
diff --git a/src/knot/server/server.h b/src/knot/server/server.h
new file mode 100644
index 0000000..5adafdb
--- /dev/null
+++ b/src/knot/server/server.h
@@ -0,0 +1,203 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <stdatomic.h>
+
+#include "knot/conf/conf.h"
+#include "knot/catalog/catalog_update.h"
+#include "knot/common/evsched.h"
+#include "knot/common/fdset.h"
+#include "knot/journal/knot_lmdb.h"
+#include "knot/server/dthreads.h"
+#include "knot/worker/pool.h"
+#include "knot/zone/backup.h"
+#include "knot/zone/zonedb.h"
+
+struct server;
+struct knot_xdp_socket;
+struct knot_quic_creds;
+
+/*!
+ * \brief I/O handler structure.
+ */
+typedef struct {
+ struct server *server; /*!< Reference to server. */
+ dt_unit_t *unit; /*!< Threading unit. */
+ unsigned *thread_state; /*!< Thread states. */
+ unsigned *thread_id; /*!< Thread identifiers per all handlers. */
+} iohandler_t;
+
+/*!
+ * \brief Server state flags.
+ */
+typedef enum {
+ ServerIdle = 0 << 0, /*!< Server is idle. */
+ ServerRunning = 1 << 0, /*!< Server is running. */
+} server_state_t;
+
+/*!
+ * \brief Server reload kinds.
+ */
+typedef enum {
+ RELOAD_NONE = 0,
+ RELOAD_FULL = 1 << 0, /*!< Reload the server and all zones. */
+ RELOAD_COMMIT = 1 << 1, /*!< Process changes from dynamic configuration. */
+ RELOAD_ZONES = 1 << 2, /*!< Reload all zones. */
+ RELOAD_CATALOG = 1 << 3, /*!< Process catalog zone changes. */
+} reload_t;
+
+/*!
+ * \brief Server interface structure.
+ */
+typedef struct {
+ int *fd_udp;
+ unsigned fd_udp_count;
+ int *fd_tcp;
+ unsigned fd_tcp_count;
+ int *fd_xdp;
+ unsigned fd_xdp_count;
+ unsigned xdp_first_thread_id;
+ struct knot_xdp_socket **xdp_sockets;
+ struct sockaddr_storage addr;
+} iface_t;
+
+/*!
+ * \brief Handler indexes.
+ */
+enum {
+ IO_UDP = 0,
+ IO_TCP = 1,
+ IO_XDP = 2,
+};
+
+/*!
+ * \brief Main server structure.
+ *
+ * Keeps references to all important structures needed for operation.
+ */
+typedef struct server {
+ /*! \brief Server state tracking. */
+ volatile unsigned state;
+
+ knot_zonedb_t *zone_db;
+ knot_lmdb_db_t timerdb;
+ knot_lmdb_db_t journaldb;
+ knot_lmdb_db_t kaspdb;
+ catalog_t catalog;
+
+ /*! \brief I/O handlers. */
+ struct {
+ unsigned size;
+ iohandler_t handler;
+ } handlers[3];
+
+ /*! \brief Background jobs. */
+ worker_pool_t *workers;
+
+ /*! \brief Event scheduler. */
+ evsched_t sched;
+
+ /*! \brief List of interfaces. */
+ iface_t *ifaces;
+ size_t n_ifaces;
+
+ /*! \brief Pending changes to catalog member zones, update indication. */
+ catalog_update_t catalog_upd;
+ atomic_bool catalog_upd_signal;
+
+ /*! \brief Context of pending zones' backup. */
+ zone_backup_ctxs_t backup_ctxs;
+
+ /*! \brief Crendentials context for QUIC. */
+ struct knot_quic_creds *quic_creds;
+} server_t;
+
+/*!
+ * \brief Initializes the server structure.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ */
+int server_init(server_t *server, int bg_workers);
+
+/*!
+ * \brief Properly destroys the server structure.
+ *
+ * \param server Server structure to be used for operation.
+ */
+void server_deinit(server_t *server);
+
+/*!
+ * \brief Starts the server.
+ *
+ * \param server Server structure to be used for operation.
+ * \param async Don't wait for zones to load if true.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL on invalid parameters.
+ *
+ */
+int server_start(server_t *server, bool async);
+
+/*!
+ * \brief Waits for the server to finish.
+ *
+ * \param server Server structure to be used for operation.
+ *
+ */
+void server_wait(server_t *server);
+
+/*!
+ * \brief Reload server configuration.
+ *
+ * \param server Server instance.
+ * \param mode Reload mode.
+ *
+ * \return Error code, KNOT_EOK if success.
+ */
+int server_reload(server_t *server, reload_t mode);
+
+/*!
+ * \brief Requests server to stop.
+ *
+ * \param server Server structure to be used for operation.
+ */
+void server_stop(server_t *server);
+
+/*!
+ * \brief Server reconfiguration routine.
+ *
+ * Routine for dynamic server reconfiguration.
+ *
+ * \param conf Configuration.
+ * \param server Server instance.
+ *
+ * \return Error code, KNOT_EOK if success.
+ */
+int server_reconfigure(conf_t *conf, server_t *server);
+
+/*!
+ * \brief Reconfigure zone database.
+ *
+ * Routine for dynamic server zones reconfiguration.
+ *
+ * \param conf Configuration.
+ * \param server Server instance.
+ * \param mode Reload mode.
+ */
+void server_update_zones(conf_t *conf, server_t *server, reload_t mode);
diff --git a/src/knot/server/tcp-handler.c b/src/knot/server/tcp-handler.c
new file mode 100644
index 0000000..433ca9b
--- /dev/null
+++ b/src/knot/server/tcp-handler.c
@@ -0,0 +1,380 @@
+/* Copyright (C) 2023 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <urcu.h>
+#ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD)
+#include <sys/uio.h>
+#endif // HAVE_SYS_UIO_H
+
+#include "knot/server/server.h"
+#include "knot/server/tcp-handler.h"
+#include "knot/common/log.h"
+#include "knot/common/fdset.h"
+#include "knot/nameserver/process_query.h"
+#include "knot/query/layer.h"
+#include "contrib/macros.h"
+#include "contrib/mempattern.h"
+#include "contrib/net.h"
+#include "contrib/openbsd/strlcpy.h"
+#include "contrib/sockaddr.h"
+#include "contrib/time.h"
+#include "contrib/ucw/mempool.h"
+
+/*! \brief TCP context data. */
+typedef struct tcp_context {
+ knot_layer_t layer; /*!< Query processing layer. */
+ server_t *server; /*!< Name server structure. */
+ struct iovec iov[2]; /*!< TX/RX buffers. */
+ unsigned client_threshold; /*!< Index of first TCP client. */
+ struct timespec last_poll_time; /*!< Time of the last socket poll. */
+ bool is_throttled; /*!< TCP connections throttling switch. */
+ fdset_t set; /*!< Set of server/client sockets. */
+ unsigned thread_id; /*!< Thread identifier. */
+ unsigned max_worker_fds; /*!< Max TCP clients per worker configuration + no. of ifaces. */
+ int idle_timeout; /*!< [s] TCP idle timeout configuration. */
+ int io_timeout; /*!< [ms] TCP send/recv timeout configuration. */
+} tcp_context_t;
+
+#define TCP_SWEEP_INTERVAL 2 /*!< [secs] granularity of connection sweeping. */
+
+static void update_sweep_timer(struct timespec *timer)
+{
+ *timer = time_now();
+ timer->tv_sec += TCP_SWEEP_INTERVAL;
+}
+
+static void update_tcp_conf(tcp_context_t *tcp)
+{
+ rcu_read_lock();
+ conf_t *pconf = conf();
+ tcp->max_worker_fds = tcp->client_threshold + \
+ MAX(pconf->cache.srv_tcp_max_clients / pconf->cache.srv_tcp_threads, 1);
+ tcp->idle_timeout = pconf->cache.srv_tcp_idle_timeout;
+ tcp->io_timeout = pconf->cache.srv_tcp_io_timeout;
+ rcu_read_unlock();
+}
+
+/*! \brief Sweep TCP connection. */
+static fdset_sweep_state_t tcp_sweep(fdset_t *set, int fd, _unused_ void *data)
+{
+ assert(set && fd >= 0);
+
+ /* Best-effort, name and shame. */
+ struct sockaddr_storage ss = { 0 };
+ socklen_t len = sizeof(struct sockaddr_storage);
+ if (getpeername(fd, (struct sockaddr *)&ss, &len) == 0) {
+ char addr_str[SOCKADDR_STRLEN];
+ sockaddr_tostr(addr_str, sizeof(addr_str), &ss);
+ log_notice("TCP, terminated inactive client, address %s", addr_str);
+ }
+
+ return FDSET_SWEEP;
+}
+
+static bool tcp_active_state(int state)
+{
+ return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static bool tcp_send_state(int state)
+{
+ return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP);
+}
+
+static void tcp_log_error(struct sockaddr_storage *ss, const char *operation, int ret)
+{
+ /* Don't log ECONN as it usually means client closed the connection. */
+ if (ret == KNOT_ETIMEOUT) {
+ char addr_str[SOCKADDR_STRLEN];
+ sockaddr_tostr(addr_str, sizeof(addr_str), ss);
+ log_debug("TCP, failed to %s due to IO timeout, closing connection, address %s",
+ operation, addr_str);
+ }
+}
+
+static unsigned tcp_set_ifaces(const iface_t *ifaces, size_t n_ifaces,
+ fdset_t *fds, int thread_id)
+{
+ if (n_ifaces == 0) {
+ return 0;
+ }
+
+ for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) {
+ if (i->fd_tcp_count == 0) { // Ignore XDP interface.
+ assert(i->fd_xdp_count > 0);
+ continue;
+ }
+
+ int tcp_id = 0;
+#ifdef ENABLE_REUSEPORT
+ if (conf()->cache.srv_tcp_reuseport) {
+ /* Note: thread_ids start with UDP threads, TCP threads follow. */
+ assert((i->fd_udp_count <= thread_id) &&
+ (thread_id < i->fd_tcp_count + i->fd_udp_count));
+
+ tcp_id = thread_id - i->fd_udp_count;
+ }
+#endif
+ int ret = fdset_add(fds, i->fd_tcp[tcp_id], FDSET_POLLIN, NULL);
+ if (ret < 0) {
+ return 0;
+ }
+ }
+
+ return fdset_get_length(fds);
+}
+
+static int tcp_handle(tcp_context_t *tcp, int fd, struct iovec *rx, struct iovec *tx)
+{
+ /* Get peer name. */
+ struct sockaddr_storage ss;
+ socklen_t addrlen = sizeof(struct sockaddr_storage);
+ if (getpeername(fd, (struct sockaddr *)&ss, &addrlen) != 0) {
+ return KNOT_EADDRNOTAVAIL;
+ }
+
+ /* Create query processing parameter. */
+ knotd_qdata_params_t params = {
+ .proto = KNOTD_QUERY_PROTO_TCP,
+ .remote = &ss,
+ .socket = fd,
+ .server = tcp->server,
+ .thread_id = tcp->thread_id
+ };
+
+ rx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ tx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
+
+ /* Receive data. */
+ int recv = net_dns_tcp_recv(fd, rx->iov_base, rx->iov_len, tcp->io_timeout);
+ if (recv > 0) {
+ rx->iov_len = recv;
+ } else {
+ tcp_log_error(&ss, "receive", recv);
+ return KNOT_EOF;
+ }
+
+ /* Initialize processing layer. */
+ knot_layer_begin(&tcp->layer, &params);
+
+ /* Create packets. */
+ knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, tcp->layer.mm);
+ knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, tcp->layer.mm);
+
+ /* Input packet. */
+ int ret = knot_pkt_parse(query, 0);
+ if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT)
+ query->parsed--; // artificially decreasing "parsed" leads to FORMERR
+ }
+ knot_layer_consume(&tcp->layer, query);
+
+ /* Resolve until NOOP or finished. */
+ while (tcp_active_state(tcp->layer.state)) {
+ knot_layer_produce(&tcp->layer, ans);
+ /* Send, if response generation passed and wasn't ignored. */
+ if (ans->size > 0 && tcp_send_state(tcp->layer.state)) {
+ int sent = net_dns_tcp_send(fd, ans->wire, ans->size,
+ tcp->io_timeout, NULL);
+ if (sent != ans->size) {
+ tcp_log_error(&ss, "send", sent);
+ ret = KNOT_EOF;
+ break;
+ }
+ }
+ }
+
+ /* Reset after processing. */
+ knot_layer_finish(&tcp->layer);
+
+ /* Flush per-query memory (including query and answer packets). */
+ mp_flush(tcp->layer.mm->ctx);
+
+ return ret;
+}
+
+static void tcp_event_accept(tcp_context_t *tcp, unsigned i)
+{
+ /* Accept client. */
+ int fd = fdset_get_fd(&tcp->set, i);
+ int client = net_accept(fd, NULL);
+ if (client >= 0) {
+ /* Assign to fdset. */
+ int idx = fdset_add(&tcp->set, client, FDSET_POLLIN, NULL);
+ if (idx < 0) {
+ close(client);
+ return;
+ }
+
+ /* Update watchdog timer. */
+ (void)fdset_set_watchdog(&tcp->set, idx, tcp->idle_timeout);
+ }
+}
+
+static int tcp_event_serve(tcp_context_t *tcp, unsigned i)
+{
+ int ret = tcp_handle(tcp, fdset_get_fd(&tcp->set, i),
+ &tcp->iov[0], &tcp->iov[1]);
+ if (ret == KNOT_EOK) {
+ /* Update socket activity timer. */
+ (void)fdset_set_watchdog(&tcp->set, i, tcp->idle_timeout);
+ }
+
+ return ret;
+}
+
+static void tcp_wait_for_events(tcp_context_t *tcp)
+{
+ fdset_t *set = &tcp->set;
+
+ /* Check if throttled with many open TCP connections. */
+ assert(fdset_get_length(set) <= tcp->max_worker_fds);
+ tcp->is_throttled = fdset_get_length(set) == tcp->max_worker_fds;
+
+ /* If throttled, temporarily ignore new TCP connections. */
+ unsigned offset = tcp->is_throttled ? tcp->client_threshold : 0;
+
+ /* Wait for events. */
+ fdset_it_t it;
+ (void)fdset_poll(set, &it, offset, TCP_SWEEP_INTERVAL * 1000);
+
+ /* Mark the time of last poll call. */
+ tcp->last_poll_time = time_now();
+
+ /* Process events. */
+ for (; !fdset_it_is_done(&it); fdset_it_next(&it)) {
+ bool should_close = false;
+ unsigned int idx = fdset_it_get_idx(&it);
+ if (fdset_it_is_error(&it)) {
+ should_close = (idx >= tcp->client_threshold);
+ } else if (fdset_it_is_pollin(&it)) {
+ /* Master sockets - new connection to accept. */
+ if (idx < tcp->client_threshold) {
+ /* Don't accept more clients than configured. */
+ if (fdset_get_length(set) < tcp->max_worker_fds) {
+ tcp_event_accept(tcp, idx);
+ }
+ /* Client sockets - already accepted connection or
+ closed connection :-( */
+ } else if (tcp_event_serve(tcp, idx) != KNOT_EOK) {
+ should_close = true;
+ }
+ }
+
+ /* Evaluate. */
+ if (should_close) {
+ fdset_it_remove(&it);
+ }
+ }
+ fdset_it_commit(&it);
+}
+
+int tcp_master(dthread_t *thread)
+{
+ if (thread == NULL || thread->data == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ iohandler_t *handler = (iohandler_t *)thread->data;
+ int thread_id = handler->thread_id[dt_get_id(thread)];
+
+#ifdef ENABLE_REUSEPORT
+ /* Set thread affinity to CPU core (overlaps with UDP/XDP). */
+ if (conf()->cache.srv_tcp_reuseport) {
+ unsigned cpu = dt_online_cpus();
+ if (cpu > 1) {
+ unsigned cpu_mask = (dt_get_id(thread) % cpu);
+ dt_setaffinity(thread, &cpu_mask, 1);
+ }
+ }
+#endif
+
+ int ret = KNOT_EOK;
+
+ /* Create big enough memory cushion. */
+ knot_mm_t mm;
+ mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE);
+
+ /* Create TCP answering context. */
+ tcp_context_t tcp = {
+ .server = handler->server,
+ .is_throttled = false,
+ .thread_id = thread_id,
+ };
+ knot_layer_init(&tcp.layer, &mm, process_query_layer());
+
+ /* Create iovec abstraction. */
+ for (unsigned i = 0; i < 2; ++i) {
+ tcp.iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ tcp.iov[i].iov_base = malloc(tcp.iov[i].iov_len);
+ if (tcp.iov[i].iov_base == NULL) {
+ ret = KNOT_ENOMEM;
+ goto finish;
+ }
+ }
+
+ /* Initialize sweep interval and TCP configuration. */
+ struct timespec next_sweep;
+ update_sweep_timer(&next_sweep);
+ update_tcp_conf(&tcp);
+
+ /* Prepare initial buffer for listening and bound sockets. */
+ if (fdset_init(&tcp.set, FDSET_RESIZE_STEP) != KNOT_EOK) {
+ goto finish;
+ }
+
+ /* Set descriptors for the configured interfaces. */
+ tcp.client_threshold = tcp_set_ifaces(handler->server->ifaces,
+ handler->server->n_ifaces,
+ &tcp.set, thread_id);
+ if (tcp.client_threshold == 0) {
+ goto finish; /* Terminate on zero interfaces. */
+ }
+
+ for (;;) {
+ /* Check for cancellation. */
+ if (dt_is_cancelled(thread)) {
+ break;
+ }
+
+ /* Serve client requests. */
+ tcp_wait_for_events(&tcp);
+
+ /* Sweep inactive clients and refresh TCP configuration. */
+ if (tcp.last_poll_time.tv_sec >= next_sweep.tv_sec) {
+ fdset_sweep(&tcp.set, &tcp_sweep, NULL);
+ update_sweep_timer(&next_sweep);
+ update_tcp_conf(&tcp);
+ }
+ }
+
+finish:
+ free(tcp.iov[0].iov_base);
+ free(tcp.iov[1].iov_base);
+ mp_delete(mm.ctx);
+ fdset_clear(&tcp.set);
+
+ return ret;
+}
diff --git a/src/knot/server/tcp-handler.h b/src/knot/server/tcp-handler.h
new file mode 100644
index 0000000..b60ce8f
--- /dev/null
+++ b/src/knot/server/tcp-handler.h
@@ -0,0 +1,43 @@
+/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/*!
+ * \brief TCP sockets threading model.
+ *
+ * The master socket distributes incoming connections among
+ * the worker threads ("buckets"). Each threads processes it's own
+ * set of sockets, and eliminates mutual exclusion problem by doing so.
+ */
+
+#pragma once
+
+#include "knot/server/dthreads.h"
+
+#define TCP_BACKLOG_SIZE 10 /*!< TCP listen backlog size. */
+
+/*!
+ * \brief TCP handler thread runnable.
+ *
+ * Listens to both bound TCP sockets for client connections and
+ * serves TCP clients. This runnable is designed to be used as coherent
+ * and implements cancellation point.
+ *
+ * \param thread Associated thread from DThreads unit.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL invalid parameters.
+ */
+int tcp_master(dthread_t *thread);
diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c
new file mode 100644
index 0000000..1e309d6
--- /dev/null
+++ b/src/knot/server/udp-handler.c
@@ -0,0 +1,575 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#define __APPLE_USE_RFC_3542
+
+#include <assert.h>
+#include <dlfcn.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/param.h>
+#ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD)
+#include <sys/uio.h>
+#endif /* HAVE_SYS_UIO_H */
+#include <unistd.h>
+
+#include "contrib/macros.h"
+#include "contrib/mempattern.h"
+#include "contrib/sockaddr.h"
+#include "contrib/ucw/mempool.h"
+#include "knot/common/fdset.h"
+#include "knot/nameserver/process_query.h"
+#include "knot/query/layer.h"
+#include "knot/server/proxyv2.h"
+#include "knot/server/server.h"
+#include "knot/server/udp-handler.h"
+#include "knot/server/xdp-handler.h"
+
+/* Buffer identifiers. */
+enum {
+ RX = 0,
+ TX = 1,
+ NBUFS = 2
+};
+
+/*! \brief UDP context data. */
+typedef struct {
+ knot_layer_t layer; /*!< Query processing layer. */
+ server_t *server; /*!< Name server structure. */
+ unsigned thread_id; /*!< Thread identifier. */
+} udp_context_t;
+
+static bool udp_state_active(int state)
+{
+ return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static void udp_handle(udp_context_t *udp, int fd, struct sockaddr_storage *ss,
+ struct iovec *rx, struct iovec *tx, struct knot_xdp_msg *xdp_msg)
+{
+ /* Create query processing parameter. */
+ knotd_qdata_params_t params = {
+ .proto = KNOTD_QUERY_PROTO_UDP,
+ .remote = ss,
+ .socket = fd,
+ .server = udp->server,
+ .xdp_msg = xdp_msg,
+ .thread_id = udp->thread_id
+ };
+ struct sockaddr_storage proxied_remote;
+
+ /* Start query processing. */
+ knot_layer_begin(&udp->layer, &params);
+
+ /* Create packets. */
+ knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, udp->layer.mm);
+ knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, udp->layer.mm);
+
+ /* Input packet. */
+ int ret = knot_pkt_parse(query, 0);
+ if (ret != KNOT_EOK && query->parsed > 0) {
+ ret = proxyv2_header_strip(&query, params.remote, &proxied_remote);
+ if (ret == KNOT_EOK) {
+ params.remote = &proxied_remote;
+ } else {
+ query->parsed--; // artificially decreasing "parsed" leads to FORMERR
+ }
+ }
+ knot_layer_consume(&udp->layer, query);
+
+ /* Process answer. */
+ while (udp_state_active(udp->layer.state)) {
+ knot_layer_produce(&udp->layer, ans);
+ }
+
+ /* Send response only if finished successfully. */
+ if (udp->layer.state == KNOT_STATE_DONE) {
+ tx->iov_len = ans->size;
+ } else {
+ tx->iov_len = 0;
+ }
+
+ /* Reset after processing. */
+ knot_layer_finish(&udp->layer);
+
+ /* Flush per-query memory (including query and answer packets). */
+ mp_flush(udp->layer.mm->ctx);
+}
+
+typedef struct {
+ void* (*udp_init)(udp_context_t *, void *);
+ void (*udp_deinit)(void *);
+ int (*udp_recv)(int, void *);
+ void (*udp_handle)(udp_context_t *, void *);
+ void (*udp_send)(void *);
+ void (*udp_sweep)(void *); // Optional
+} udp_api_t;
+
+/*! \brief Control message to fit IP_PKTINFO or IPv6_RECVPKTINFO. */
+typedef union {
+ struct cmsghdr cmsg;
+ uint8_t buf[CMSG_SPACE(sizeof(struct in6_pktinfo))];
+} cmsg_pktinfo_t;
+
+static void udp_pktinfo_handle(const struct msghdr *rx, struct msghdr *tx)
+{
+ tx->msg_controllen = rx->msg_controllen;
+ if (tx->msg_controllen > 0) {
+ tx->msg_control = rx->msg_control;
+ } else {
+ // BSD has problem with zero length and not-null pointer
+ tx->msg_control = NULL;
+ }
+
+#if defined(__linux__) || defined(__APPLE__)
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(tx);
+ if (cmsg == NULL) {
+ return;
+ }
+
+ /* Unset the ifindex to not bypass the routing tables. */
+ if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
+ struct in_pktinfo *info = (struct in_pktinfo *)CMSG_DATA(cmsg);
+ info->ipi_spec_dst = info->ipi_addr;
+ info->ipi_ifindex = 0;
+ } else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
+ struct in6_pktinfo *info = (struct in6_pktinfo *)CMSG_DATA(cmsg);
+ info->ipi6_ifindex = 0;
+ }
+#endif
+}
+
+/* UDP recvfrom() request struct. */
+struct udp_recvfrom {
+ int fd;
+ struct sockaddr_storage addr;
+ struct msghdr msg[NBUFS];
+ struct iovec iov[NBUFS];
+ uint8_t buf[NBUFS][KNOT_WIRE_MAX_PKTSIZE];
+ cmsg_pktinfo_t pktinfo;
+};
+
+static void *udp_recvfrom_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock)
+{
+ struct udp_recvfrom *rq = malloc(sizeof(struct udp_recvfrom));
+ if (rq == NULL) {
+ return NULL;
+ }
+ memset(rq, 0, sizeof(struct udp_recvfrom));
+
+ for (unsigned i = 0; i < NBUFS; ++i) {
+ rq->iov[i].iov_base = rq->buf + i;
+ rq->iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ rq->msg[i].msg_name = &rq->addr;
+ rq->msg[i].msg_namelen = sizeof(rq->addr);
+ rq->msg[i].msg_iov = &rq->iov[i];
+ rq->msg[i].msg_iovlen = 1;
+ rq->msg[i].msg_control = &rq->pktinfo.cmsg;
+ rq->msg[i].msg_controllen = sizeof(rq->pktinfo);
+ }
+ return rq;
+}
+
+static void udp_recvfrom_deinit(void *d)
+{
+ struct udp_recvfrom *rq = d;
+ free(rq);
+}
+
+static int udp_recvfrom_recv(int fd, void *d)
+{
+ /* Reset max lengths. */
+ struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
+ rq->iov[RX].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ rq->msg[RX].msg_namelen = sizeof(struct sockaddr_storage);
+ rq->msg[RX].msg_controllen = sizeof(rq->pktinfo);
+
+ int ret = recvmsg(fd, &rq->msg[RX], MSG_DONTWAIT);
+ if (ret > 0) {
+ rq->fd = fd;
+ rq->iov[RX].iov_len = ret;
+ return 1;
+ }
+
+ return 0;
+}
+
+static void udp_recvfrom_handle(udp_context_t *ctx, void *d)
+{
+ struct udp_recvfrom *rq = d;
+
+ /* Prepare TX address. */
+ rq->msg[TX].msg_namelen = rq->msg[RX].msg_namelen;
+ rq->iov[TX].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+
+ udp_pktinfo_handle(&rq->msg[RX], &rq->msg[TX]);
+
+ /* Process received pkt. */
+ udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX], NULL);
+}
+
+static void udp_recvfrom_send(void *d)
+{
+ struct udp_recvfrom *rq = d;
+ if (rq->iov[TX].iov_len > 0) {
+ (void)sendmsg(rq->fd, &rq->msg[TX], 0);
+ }
+}
+
+_unused_
+static udp_api_t udp_recvfrom_api = {
+ udp_recvfrom_init,
+ udp_recvfrom_deinit,
+ udp_recvfrom_recv,
+ udp_recvfrom_handle,
+ udp_recvfrom_send,
+};
+
+#ifdef ENABLE_RECVMMSG
+/* UDP recvmmsg() request struct. */
+struct udp_recvmmsg {
+ int fd;
+ struct sockaddr_storage addrs[RECVMMSG_BATCHLEN];
+ char *iobuf[NBUFS];
+ struct iovec *iov[NBUFS];
+ struct mmsghdr *msgs[NBUFS];
+ unsigned rcvd;
+ knot_mm_t mm;
+ cmsg_pktinfo_t pktinfo[RECVMMSG_BATCHLEN];
+};
+
+static void *udp_recvmmsg_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock)
+{
+ knot_mm_t mm;
+ mm_ctx_mempool(&mm, sizeof(struct udp_recvmmsg));
+
+ struct udp_recvmmsg *rq = mm_alloc(&mm, sizeof(struct udp_recvmmsg));
+ memset(rq, 0, sizeof(*rq));
+ memcpy(&rq->mm, &mm, sizeof(knot_mm_t));
+
+ /* Initialize buffers. */
+ for (unsigned i = 0; i < NBUFS; ++i) {
+ rq->iobuf[i] = mm_alloc(&mm, KNOT_WIRE_MAX_PKTSIZE * RECVMMSG_BATCHLEN);
+ rq->iov[i] = mm_alloc(&mm, sizeof(struct iovec) * RECVMMSG_BATCHLEN);
+ rq->msgs[i] = mm_alloc(&mm, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN);
+ memset(rq->msgs[i], 0, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN);
+ for (unsigned k = 0; k < RECVMMSG_BATCHLEN; ++k) {
+ rq->iov[i][k].iov_base = rq->iobuf[i] + k * KNOT_WIRE_MAX_PKTSIZE;
+ rq->iov[i][k].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ rq->msgs[i][k].msg_hdr.msg_iov = rq->iov[i] + k;
+ rq->msgs[i][k].msg_hdr.msg_iovlen = 1;
+ rq->msgs[i][k].msg_hdr.msg_name = rq->addrs + k;
+ rq->msgs[i][k].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ rq->msgs[i][k].msg_hdr.msg_control = &rq->pktinfo[k].cmsg;
+ rq->msgs[i][k].msg_hdr.msg_controllen = sizeof(cmsg_pktinfo_t);
+ }
+ }
+
+ return rq;
+}
+
+static void udp_recvmmsg_deinit(void *d)
+{
+ struct udp_recvmmsg *rq = d;
+ if (rq != NULL) {
+ mp_delete(rq->mm.ctx);
+ }
+}
+
+static int udp_recvmmsg_recv(int fd, void *d)
+{
+ struct udp_recvmmsg *rq = d;
+
+ int n = recvmmsg(fd, rq->msgs[RX], RECVMMSG_BATCHLEN, MSG_DONTWAIT, NULL);
+ if (n > 0) {
+ rq->fd = fd;
+ rq->rcvd = n;
+ }
+ return n;
+}
+
+static void udp_recvmmsg_handle(udp_context_t *ctx, void *d)
+{
+ struct udp_recvmmsg *rq = d;
+
+ /* Handle each received message. */
+ unsigned j = 0;
+ for (unsigned i = 0; i < rq->rcvd; ++i) {
+ struct msghdr *rx = &rq->msgs[RX][i].msg_hdr;
+ struct msghdr *tx = &rq->msgs[TX][j].msg_hdr;
+
+ /* Set received bytes. */
+ rx->msg_iov->iov_len = rq->msgs[RX][i].msg_len;
+ /* Update mapping of address buffer. */
+ tx->msg_name = rx->msg_name;
+ tx->msg_namelen = rx->msg_namelen;
+
+ /* Update output message control buffer. */
+ udp_pktinfo_handle(rx, tx);
+
+ udp_handle(ctx, rq->fd, rq->addrs + i, rx->msg_iov, tx->msg_iov, NULL);
+
+ if (tx->msg_iov->iov_len > 0) {
+ rq->msgs[TX][j].msg_len = tx->msg_iov->iov_len;
+ j++;
+ } else {
+ /* Reset tainted output context. */
+ tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ }
+
+ /* Reset input context. */
+ rx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ rx->msg_namelen = sizeof(struct sockaddr_storage);
+ rx->msg_controllen = sizeof(cmsg_pktinfo_t);
+ }
+ rq->rcvd = j;
+}
+
+static void udp_recvmmsg_send(void *d)
+{
+ struct udp_recvmmsg *rq = d;
+
+ (void)sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0);
+ for (unsigned i = 0; i < rq->rcvd; ++i) {
+ struct msghdr *tx = &rq->msgs[TX][i].msg_hdr;
+
+ /* Reset output context. */
+ tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE;
+ }
+}
+
+static udp_api_t udp_recvmmsg_api = {
+ udp_recvmmsg_init,
+ udp_recvmmsg_deinit,
+ udp_recvmmsg_recv,
+ udp_recvmmsg_handle,
+ udp_recvmmsg_send,
+};
+#endif /* ENABLE_RECVMMSG */
+
+#ifdef ENABLE_XDP
+
+static void *xdp_recvmmsg_init(udp_context_t *ctx, void *xdp_sock)
+{
+ return xdp_handle_init(ctx->server, xdp_sock);
+}
+
+static void xdp_recvmmsg_deinit(void *d)
+{
+ if (d != NULL) {
+ xdp_handle_free(d);
+ }
+}
+
+static int xdp_recvmmsg_recv(_unused_ int fd, void *d)
+{
+ return xdp_handle_recv(d);
+}
+
+static void xdp_recvmmsg_handle(udp_context_t *ctx, void *d)
+{
+ xdp_handle_msgs(d, &ctx->layer, ctx->server, ctx->thread_id);
+}
+
+static void xdp_recvmmsg_send(void *d)
+{
+ xdp_handle_send(d);
+}
+
+static void xdp_recvmmsg_sweep(void *d)
+{
+ xdp_handle_reconfigure(d);
+ xdp_handle_sweep(d);
+}
+
+static udp_api_t xdp_recvmmsg_api = {
+ xdp_recvmmsg_init,
+ xdp_recvmmsg_deinit,
+ xdp_recvmmsg_recv,
+ xdp_recvmmsg_handle,
+ xdp_recvmmsg_send,
+ xdp_recvmmsg_sweep,
+};
+#endif /* ENABLE_XDP */
+
+static bool is_xdp_thread(const server_t *server, int thread_id)
+{
+ return server->handlers[IO_XDP].size > 0 &&
+ server->handlers[IO_XDP].handler.thread_id[0] <= thread_id;
+}
+
+static int iface_udp_fd(const iface_t *iface, int thread_id, bool xdp_thread,
+ void **xdp_socket)
+{
+ if (xdp_thread) {
+#ifdef ENABLE_XDP
+ if (thread_id < iface->xdp_first_thread_id ||
+ thread_id >= iface->xdp_first_thread_id + iface->fd_xdp_count) {
+ return -1; // Different XDP interface.
+ }
+ size_t xdp_wrk_id = thread_id - iface->xdp_first_thread_id;
+ assert(xdp_wrk_id < iface->fd_xdp_count);
+ *xdp_socket = iface->xdp_sockets[xdp_wrk_id];
+ return iface->fd_xdp[xdp_wrk_id];
+#else
+ assert(0);
+ return -1;
+#endif
+ } else { // UDP thread.
+ if (iface->fd_udp_count == 0) { // No UDP interfaces.
+ assert(iface->fd_xdp_count > 0);
+ return -1;
+ }
+#ifdef ENABLE_REUSEPORT
+ assert(thread_id < iface->fd_udp_count);
+ return iface->fd_udp[thread_id];
+#else
+ return iface->fd_udp[0];
+#endif
+ }
+}
+
+static unsigned udp_set_ifaces(const server_t *server, size_t n_ifaces, fdset_t *fds,
+ int thread_id, void **xdp_socket)
+{
+ if (n_ifaces == 0) {
+ return 0;
+ }
+
+ bool xdp_thread = is_xdp_thread(server, thread_id);
+ const iface_t *ifaces = server->ifaces;
+
+ for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) {
+ int fd = iface_udp_fd(i, thread_id, xdp_thread, xdp_socket);
+ if (fd < 0) {
+ continue;
+ }
+ int ret = fdset_add(fds, fd, FDSET_POLLIN, NULL);
+ if (ret < 0) {
+ return 0;
+ }
+ }
+
+ assert(!xdp_thread || fdset_get_length(fds) == 1);
+ return fdset_get_length(fds);
+}
+
+int udp_master(dthread_t *thread)
+{
+ if (thread == NULL || thread->data == NULL) {
+ return KNOT_EINVAL;
+ }
+
+ iohandler_t *handler = (iohandler_t *)thread->data;
+ int thread_id = handler->thread_id[dt_get_id(thread)];
+
+ if (handler->server->n_ifaces == 0) {
+ return KNOT_EOK;
+ }
+
+ /* Set thread affinity to CPU core (same for UDP and XDP). */
+ unsigned cpu = dt_online_cpus();
+ if (cpu > 1) {
+ unsigned cpu_mask = (dt_get_id(thread) % cpu);
+ dt_setaffinity(thread, &cpu_mask, 1);
+ }
+
+ /* Choose processing API. */
+ udp_api_t *api = NULL;
+ if (is_xdp_thread(handler->server, thread_id)) {
+#ifdef ENABLE_XDP
+ api = &xdp_recvmmsg_api;
+#else
+ assert(0);
+#endif
+ } else {
+#ifdef ENABLE_RECVMMSG
+ api = &udp_recvmmsg_api;
+#else
+ api = &udp_recvfrom_api;
+#endif
+ }
+ void *api_ctx = NULL;
+
+ /* Create big enough memory cushion. */
+ knot_mm_t mm;
+ mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE);
+
+ /* Create UDP answering context. */
+ udp_context_t udp = {
+ .server = handler->server,
+ .thread_id = thread_id,
+ };
+ knot_layer_init(&udp.layer, &mm, process_query_layer());
+
+ /* Allocate descriptors for the configured interfaces. */
+ void *xdp_socket = NULL;
+ size_t nifs = handler->server->n_ifaces;
+ fdset_t fds;
+ if (fdset_init(&fds, nifs) != KNOT_EOK) {
+ goto finish;
+ }
+ unsigned nfds = udp_set_ifaces(handler->server, nifs, &fds,
+ thread_id, &xdp_socket);
+ if (nfds == 0) {
+ goto finish;
+ }
+
+ /* Initialize the networking API. */
+ api_ctx = api->udp_init(&udp, xdp_socket);
+ if (api_ctx == NULL) {
+ goto finish;
+ }
+
+ /* Loop until all data is read. */
+ for (;;) {
+ /* Cancellation point. */
+ if (dt_is_cancelled(thread)) {
+ break;
+ }
+
+ /* Wait for events. */
+ fdset_it_t it;
+ (void)fdset_poll(&fds, &it, 0, 1000);
+
+ /* Process the events. */
+ for (; !fdset_it_is_done(&it); fdset_it_next(&it)) {
+ if (!fdset_it_is_pollin(&it)) {
+ continue;
+ }
+ if (api->udp_recv(fdset_it_get_fd(&it), api_ctx) > 0) {
+ api->udp_handle(&udp, api_ctx);
+ api->udp_send(api_ctx);
+ }
+ }
+
+ /* Regular maintenance (XDP-TCP only). */
+ if (api->udp_sweep != NULL) {
+ api->udp_sweep(api_ctx);
+ }
+ }
+
+finish:
+ api->udp_deinit(api_ctx);
+ mp_delete(mm.ctx);
+ fdset_clear(&fds);
+
+ return KNOT_EOK;
+}
diff --git a/src/knot/server/udp-handler.h b/src/knot/server/udp-handler.h
new file mode 100644
index 0000000..b09e43e
--- /dev/null
+++ b/src/knot/server/udp-handler.h
@@ -0,0 +1,43 @@
+/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+/*!
+ * \brief UDP sockets threading model.
+ *
+ * The master socket locks one worker thread at a time
+ * and saves events in it's own backing store for asynchronous processing.
+ * The worker threads work asynchronously in thread pool.
+ */
+
+#pragma once
+
+#include "knot/server/dthreads.h"
+
+#define RECVMMSG_BATCHLEN 10 /*!< Default recvmmsg() batch size. */
+
+/*!
+ * \brief UDP handler thread runnable.
+ *
+ * Listen to DNS datagrams in a loop on a UDP socket and
+ * reply to them. This runnable is designed to be used as coherent
+ * and implements cancellation point.
+ *
+ * \param thread Associated thread from DThreads unit.
+ *
+ * \retval KNOT_EOK on success.
+ * \retval KNOT_EINVAL invalid parameters.
+ */
+int udp_master(dthread_t *thread);
diff --git a/src/knot/server/xdp-handler.c b/src/knot/server/xdp-handler.c
new file mode 100644
index 0000000..3c9f6d6
--- /dev/null
+++ b/src/knot/server/xdp-handler.c
@@ -0,0 +1,506 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifdef ENABLE_XDP
+
+#include <assert.h>
+#include <stdlib.h>
+#include <urcu.h>
+
+#include "knot/server/xdp-handler.h"
+#include "knot/common/log.h"
+#include "knot/server/proxyv2.h"
+#include "knot/server/server.h"
+#include "contrib/sockaddr.h"
+#include "contrib/time.h"
+#include "contrib/ucw/mempool.h"
+#include "libknot/endian.h"
+#include "libknot/error.h"
+#ifdef ENABLE_QUIC
+#include "libknot/xdp/quic.h"
+#endif // ENABLE_QUIC
+#include "libknot/xdp/tcp.h"
+#include "libknot/xdp/tcp_iobuf.h"
+
+#define QUIC_MAX_SEND_PER_RECV 4
+#define QUIC_IBUFS_PER_CONN 512 /* Heuristic value: this means that e.g. for 100k allowed
+ QUIC conns, we will limit total size of input buffers to 50 MiB. */
+
+typedef struct {
+ uint64_t last_log;
+ knot_sweep_stats_t stats;
+} closed_log_ctx_t;
+
+typedef struct xdp_handle_ctx {
+ knot_xdp_socket_t *sock;
+ knot_xdp_msg_t msg_recv[XDP_BATCHLEN];
+ knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN];
+ knot_tcp_relay_t relays[XDP_BATCHLEN];
+ uint32_t msg_recv_count;
+ uint32_t msg_udp_count;
+ knot_tcp_table_t *tcp_table;
+ knot_tcp_table_t *syn_table;
+
+#ifdef ENABLE_QUIC
+ knot_xquic_conn_t *quic_relays[XDP_BATCHLEN];
+ int quic_rets[XDP_BATCHLEN];
+ knot_xquic_table_t *quic_table;
+ closed_log_ctx_t quic_closed;
+#endif // ENABLE_QUIC
+
+ bool tcp;
+ size_t tcp_max_conns;
+ size_t tcp_syn_conns;
+ size_t tcp_max_inbufs;
+ size_t tcp_max_obufs;
+ uint32_t tcp_idle_close; // In microseconds.
+ uint32_t tcp_idle_reset; // In microseconds.
+ uint32_t tcp_idle_resend; // In microseconds.
+
+ uint16_t quic_port; // Network-byte order!
+ size_t quic_max_conns;
+ uint64_t quic_idle_close; // In nanoseconds.
+ size_t quic_max_inbufs;
+ size_t quic_max_obufs;
+
+ closed_log_ctx_t tcp_closed;
+} xdp_handle_ctx_t;
+
+static bool udp_state_active(int state)
+{
+ return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static bool tcp_active_state(int state)
+{
+ return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static bool tcp_send_state(int state)
+{
+ return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP);
+}
+
+static void log_closed(closed_log_ctx_t *ctx, bool tcp)
+{
+ struct timespec now = time_now();
+ uint64_t sec = now.tv_sec + now.tv_nsec / 1000000000;
+ if (sec - ctx->last_log <= 9 || (ctx->stats.total == 0)) {
+ return;
+ }
+
+ const char *proto = tcp ? "TCP" : "QUIC";
+
+ uint32_t timedout = ctx->stats.counters[KNOT_SWEEP_CTR_TIMEOUT];
+ uint32_t limit_conn = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_CONN];
+ uint32_t limit_ibuf = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_IBUF];
+ uint32_t limit_obuf = ctx->stats.counters[KNOT_SWEEP_CTR_LIMIT_OBUF];
+
+ if (tcp || ctx->stats.total != timedout) {
+ log_notice("%s, connection sweep, closed %u, count limit %u, inbuf limit %u, outbuf limit %u",
+ proto, timedout, limit_conn, limit_ibuf, limit_obuf);
+ } else {
+ log_debug("%s, timed out connections %u", proto, timedout);
+ }
+
+ ctx->last_log = sec;
+ knot_sweep_stats_reset(&ctx->stats);
+}
+
+void xdp_handle_reconfigure(xdp_handle_ctx_t *ctx)
+{
+ rcu_read_lock();
+ conf_t *pconf = conf();
+ ctx->tcp = pconf->cache.xdp_tcp;
+ ctx->quic_port = htobe16(pconf->cache.xdp_quic);
+ ctx->tcp_max_conns = pconf->cache.xdp_tcp_max_clients / pconf->cache.srv_xdp_threads;
+ ctx->tcp_syn_conns = 2 * ctx->tcp_max_conns;
+ ctx->tcp_max_inbufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads;
+ ctx->tcp_max_obufs = pconf->cache.xdp_tcp_outbuf_max_size / pconf->cache.srv_xdp_threads;
+ ctx->tcp_idle_close = pconf->cache.xdp_tcp_idle_close * 1000000;
+ ctx->tcp_idle_reset = pconf->cache.xdp_tcp_idle_reset * 1000000;
+ ctx->tcp_idle_resend= pconf->cache.xdp_tcp_idle_resend * 1000000;
+ ctx->quic_max_conns = pconf->cache.srv_quic_max_clients / pconf->cache.srv_xdp_threads;
+ ctx->quic_idle_close= pconf->cache.srv_quic_idle_close * 1000000000LU;
+ ctx->quic_max_inbufs= ctx->quic_max_conns * QUIC_IBUFS_PER_CONN;
+ ctx->quic_max_obufs = pconf->cache.srv_quic_obuf_max_size;
+ rcu_read_unlock();
+}
+
+void xdp_handle_free(xdp_handle_ctx_t *ctx)
+{
+ knot_tcp_table_free(ctx->tcp_table);
+ knot_tcp_table_free(ctx->syn_table);
+#ifdef ENABLE_QUIC
+ knot_xquic_table_free(ctx->quic_table);
+#endif // ENABLE_QUIC
+ free(ctx);
+}
+
+#ifdef ENABLE_QUIC
+static void quic_log_cb(const char *line)
+{
+ log_debug("QUIC: %s", line);
+}
+#endif // ENABLE_QUIC
+
+xdp_handle_ctx_t *xdp_handle_init(struct server *server, knot_xdp_socket_t *xdp_sock)
+{
+ xdp_handle_ctx_t *ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ return NULL;
+ }
+ ctx->sock = xdp_sock;
+
+ xdp_handle_reconfigure(ctx);
+
+ if (ctx->tcp) {
+ // NOTE: the table size don't have to equal its max usage!
+ ctx->tcp_table = knot_tcp_table_new(ctx->tcp_max_conns, NULL);
+ if (ctx->tcp_table == NULL) {
+ xdp_handle_free(ctx);
+ return NULL;
+ }
+ ctx->syn_table = knot_tcp_table_new(ctx->tcp_syn_conns, ctx->tcp_table);
+ if (ctx->syn_table == NULL) {
+ xdp_handle_free(ctx);
+ return NULL;
+ }
+ }
+
+ if (ctx->quic_port > 0) {
+#ifdef ENABLE_QUIC
+ conf_t *pconf = conf();
+ size_t udp_pl = MIN(pconf->cache.srv_udp_max_payload_ipv4, pconf->cache.srv_udp_max_payload_ipv6);
+ ctx->quic_table = knot_xquic_table_new(ctx->quic_max_conns, ctx->quic_max_inbufs,
+ ctx->quic_max_obufs, udp_pl, server->quic_creds);
+ if (ctx->quic_table == NULL) {
+ xdp_handle_free(ctx);
+ return NULL;
+ }
+ if (conf_get_bool(pconf, C_XDP, C_QUIC_LOG)) {
+ ctx->quic_table->log_cb = quic_log_cb;
+ }
+#else
+ assert(0); // verified in configuration checks
+#endif // ENABLE_QUIC
+ }
+
+ return ctx;
+}
+
+int xdp_handle_recv(xdp_handle_ctx_t *ctx)
+{
+ int ret = knot_xdp_recv(ctx->sock, ctx->msg_recv, XDP_BATCHLEN,
+ &ctx->msg_recv_count, NULL);
+ return ret == KNOT_EOK ? ctx->msg_recv_count : ret;
+}
+
+static void handle_init(knotd_qdata_params_t *params, knot_layer_t *layer,
+ knotd_query_proto_t proto, const knot_xdp_msg_t *msg,
+ const struct iovec *payload, struct sockaddr_storage *proxied_remote)
+{
+ params->proto = proto;
+ params->remote = (struct sockaddr_storage *)&msg->ip_from;
+ params->xdp_msg = msg;
+
+ knot_layer_begin(layer, params);
+
+ knot_pkt_t *query = knot_pkt_new(payload->iov_base, payload->iov_len, layer->mm);
+ int ret = knot_pkt_parse(query, 0);
+ if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT)
+ if (params->proto == KNOTD_QUERY_PROTO_UDP &&
+ proxyv2_header_strip(&query, params->remote, proxied_remote) == KNOT_EOK) {
+ assert(proxied_remote);
+ params->remote = proxied_remote;
+ } else {
+ query->parsed--; // artificially decreasing "parsed" leads to FORMERR
+ }
+ }
+ knot_layer_consume(layer, query);
+}
+
+static void handle_finish(knot_layer_t *layer)
+{
+ knot_layer_finish(layer);
+
+ // Flush per-query memory (including query and answer packets).
+ mp_flush(layer->mm->ctx);
+}
+
+static void handle_udp(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
+ knotd_qdata_params_t *params)
+{
+ struct sockaddr_storage proxied_remote;
+
+ ctx->msg_udp_count = 0;
+
+ for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
+ knot_xdp_msg_t *msg_recv = &ctx->msg_recv[i];
+ knot_xdp_msg_t *msg_send = &ctx->msg_send_udp[ctx->msg_udp_count];
+
+ // Skip TCP or QUIC or marked (zero length) message.
+ if ((msg_recv->flags & KNOT_XDP_MSG_TCP) ||
+ msg_recv->ip_to.sin6_port == ctx->quic_port ||
+ msg_recv->payload.iov_len == 0) {
+ continue;
+ }
+
+ // Try to allocate a buffer for a reply.
+ if (knot_xdp_reply_alloc(ctx->sock, msg_recv, msg_send) != KNOT_EOK) {
+ log_notice("UDP, failed to send some packets");
+ break; // Drop the rest of the messages.
+ }
+ ctx->msg_udp_count++;
+
+ // Consume the query.
+ handle_init(params, layer, KNOTD_QUERY_PROTO_UDP, msg_recv, &msg_recv->payload,
+ &proxied_remote);
+
+ // Process the reply.
+ knot_pkt_t *ans = knot_pkt_new(msg_send->payload.iov_base,
+ msg_send->payload.iov_len, layer->mm);
+ while (udp_state_active(layer->state)) {
+ knot_layer_produce(layer, ans);
+ }
+ if (layer->state == KNOT_STATE_DONE) {
+ msg_send->payload.iov_len = ans->size;
+ } else {
+ // If not success, don't send any reply.
+ msg_send->payload.iov_len = 0;
+ }
+
+ // Reset the processing.
+ handle_finish(layer);
+ }
+}
+
+static void handle_tcp(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
+ knotd_qdata_params_t *params)
+{
+ int ret = knot_tcp_recv(ctx->relays, ctx->msg_recv, ctx->msg_recv_count,
+ ctx->tcp_table, ctx->syn_table, XDP_TCP_IGNORE_NONE);
+ if (ret != KNOT_EOK) {
+ log_notice("TCP, failed to process some packets (%s)", knot_strerror(ret));
+ return;
+ } else if (knot_tcp_relay_empty(&ctx->relays[0])) { // no TCP traffic
+ return;
+ }
+
+ uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE];
+
+ for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
+ knot_tcp_relay_t *rl = &ctx->relays[i];
+
+ // Process all complete DNS queries in one TCP stream.
+ for (size_t j = 0; j < rl->inbufs_count; j++) {
+ // Consume the query.
+ handle_init(params, layer, KNOTD_QUERY_PROTO_TCP, rl->msg, &rl->inbufs[j], NULL);
+ params->measured_rtt = rl->conn->establish_rtt;
+
+ // Process the reply.
+ knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm);
+ while (tcp_active_state(layer->state)) {
+ knot_layer_produce(layer, ans);
+ if (!tcp_send_state(layer->state)) {
+ continue;
+ }
+
+ (void)knot_tcp_reply_data(rl, ctx->tcp_table, false,
+ ans->wire, ans->size);
+ }
+
+ handle_finish(layer);
+ }
+ }
+}
+
+#ifdef ENABLE_QUIC
+static void handle_quic_stream(knot_xquic_conn_t *conn, int64_t stream_id, struct iovec *inbuf,
+ knot_layer_t *layer, knotd_qdata_params_t *params, uint8_t *ans_buf,
+ size_t ans_buf_size, const knot_xdp_msg_t *xdp_msg)
+{
+ // Consume the query.
+ handle_init(params, layer, KNOTD_QUERY_PROTO_QUIC, xdp_msg, inbuf, NULL);
+ params->measured_rtt = knot_xquic_conn_rtt(conn);
+
+ // Process the reply.
+ knot_pkt_t *ans = knot_pkt_new(ans_buf, ans_buf_size, layer->mm);
+ while (tcp_active_state(layer->state)) {
+ knot_layer_produce(layer, ans);
+ if (!tcp_send_state(layer->state)) {
+ continue;
+ }
+ if (knot_xquic_stream_add_data(conn, stream_id, ans->wire, ans->size) == NULL) {
+ break;
+ }
+ }
+
+ handle_finish(layer);
+}
+#endif // ENABLE_QUIC
+
+static void handle_quic(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
+ knotd_qdata_params_t *params)
+{
+#ifdef ENABLE_QUIC
+ if (ctx->quic_table == NULL) {
+ return;
+ }
+
+ uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE];
+
+ for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
+ knot_xdp_msg_t *msg_recv = &ctx->msg_recv[i];
+ ctx->quic_relays[i] = NULL;
+
+ if ((msg_recv->flags & KNOT_XDP_MSG_TCP) ||
+ msg_recv->ip_to.sin6_port != ctx->quic_port ||
+ msg_recv->payload.iov_len == 0) {
+ continue;
+ }
+
+ ctx->quic_rets[i] = knot_xquic_handle(ctx->quic_table, msg_recv,
+ ctx->quic_idle_close,
+ &ctx->quic_relays[i]);
+ knot_xquic_conn_t *rl = ctx->quic_relays[i];
+
+ int64_t stream_id;
+ knot_xquic_stream_t *stream;
+
+ while (rl != NULL && (stream = knot_xquic_stream_get_process(rl, &stream_id)) != NULL) {
+ assert(stream->inbuf_fin != NULL);
+ assert(stream->inbuf_fin->iov_len > 0);
+ handle_quic_stream(rl, stream_id, stream->inbuf_fin, layer, params,
+ ans_buf, sizeof(ans_buf), &ctx->msg_recv[i]);
+ free(stream->inbuf_fin);
+ stream->inbuf_fin = NULL;
+ }
+ }
+#else
+ (void)(ctx);
+ (void)(layer);
+ (void)(params);
+#endif // ENABLE_QUIC
+}
+
+void xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
+ server_t *server, unsigned thread_id)
+{
+ assert(ctx->msg_recv_count > 0);
+
+ knotd_qdata_params_t params = {
+ .socket = knot_xdp_socket_fd(ctx->sock),
+ .server = server,
+ .thread_id = thread_id,
+ };
+
+ knot_xdp_send_prepare(ctx->sock);
+
+ handle_udp(ctx, layer, &params);
+ if (ctx->tcp) {
+ handle_tcp(ctx, layer, &params);
+ }
+ handle_quic(ctx, layer, &params);
+
+ knot_xdp_recv_finish(ctx->sock, ctx->msg_recv, ctx->msg_recv_count);
+}
+
+void xdp_handle_send(xdp_handle_ctx_t *ctx)
+{
+ uint32_t unused;
+ int ret = knot_xdp_send(ctx->sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused);
+ if (ret != KNOT_EOK) {
+ log_notice("UDP, failed to send some packets");
+ }
+ if (ctx->tcp) {
+ ret = knot_tcp_send(ctx->sock, ctx->relays, ctx->msg_recv_count,
+ XDP_BATCHLEN);
+ if (ret != KNOT_EOK) {
+ log_notice("TCP, failed to send some packets");
+ }
+ }
+#ifdef ENABLE_QUIC
+ for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
+ if (ctx->quic_relays[i] == NULL) {
+ continue;
+ }
+
+ ret = knot_xquic_send(ctx->quic_table, ctx->quic_relays[i], ctx->sock,
+ &ctx->msg_recv[i], ctx->quic_rets[i],
+ QUIC_MAX_SEND_PER_RECV, false);
+ if (ret != KNOT_EOK) {
+ log_notice("QUIC, failed to send some packets");
+ }
+ }
+ knot_xquic_cleanup(ctx->quic_relays, ctx->msg_recv_count);
+#endif // ENABLE_QUIC
+
+ (void)knot_xdp_send_finish(ctx->sock);
+
+ if (ctx->tcp) {
+ knot_tcp_cleanup(ctx->tcp_table, ctx->relays, ctx->msg_recv_count);
+ }
+}
+
+void xdp_handle_sweep(xdp_handle_ctx_t *ctx)
+{
+#ifdef ENABLE_QUIC
+ if (ctx->quic_table != NULL) {
+ knot_xquic_table_sweep(ctx->quic_table, &ctx->quic_closed.stats);
+ log_closed(&ctx->quic_closed, false);
+ }
+#endif // ENABLE_QUIC
+
+ if (!ctx->tcp) {
+ return;
+ }
+
+ int ret = KNOT_EOK;
+ uint32_t prev_total;
+ knot_tcp_relay_t sweep_relays[XDP_BATCHLEN];
+ do {
+ knot_xdp_send_prepare(ctx->sock);
+
+ prev_total = ctx->tcp_closed.stats.total;
+
+ ret = knot_tcp_sweep(ctx->tcp_table, ctx->tcp_idle_close, ctx->tcp_idle_reset,
+ ctx->tcp_idle_resend,
+ ctx->tcp_max_conns, ctx->tcp_max_inbufs, ctx->tcp_max_obufs,
+ sweep_relays, XDP_BATCHLEN, &ctx->tcp_closed.stats);
+ if (ret == KNOT_EOK) {
+ ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN);
+ }
+ knot_tcp_cleanup(ctx->tcp_table, sweep_relays, XDP_BATCHLEN);
+ if (ret != KNOT_EOK) {
+ break;
+ }
+
+ ret = knot_tcp_sweep(ctx->syn_table, UINT32_MAX, ctx->tcp_idle_reset,
+ UINT32_MAX, ctx->tcp_syn_conns, SIZE_MAX, SIZE_MAX,
+ sweep_relays, XDP_BATCHLEN, &ctx->tcp_closed.stats);
+ if (ret == KNOT_EOK) {
+ ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN);
+ }
+ knot_tcp_cleanup(ctx->syn_table, sweep_relays, XDP_BATCHLEN);
+
+ (void)knot_xdp_send_finish(ctx->sock);
+ } while (ret == KNOT_EOK && prev_total < ctx->tcp_closed.stats.total);
+
+ log_closed(&ctx->tcp_closed, true);
+}
+
+#endif // ENABLE_XDP
diff --git a/src/knot/server/xdp-handler.h b/src/knot/server/xdp-handler.h
new file mode 100644
index 0000000..e6374ca
--- /dev/null
+++ b/src/knot/server/xdp-handler.h
@@ -0,0 +1,67 @@
+/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef ENABLE_XDP
+
+#include "knot/query/layer.h"
+#include "libknot/xdp/xdp.h"
+
+#define XDP_BATCHLEN 32 /*!< XDP receive batch size. */
+
+struct xdp_handle_ctx;
+struct server;
+
+/*!
+ * \brief Initialize XDP packet handling context.
+ */
+struct xdp_handle_ctx *xdp_handle_init(struct server *server, knot_xdp_socket_t *sock);
+
+/*!
+ * \brief Deinitialize XDP packet handling context.
+ */
+void xdp_handle_free(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Receive packets thru XDP socket.
+ */
+int xdp_handle_recv(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Answer packets including DNS layers.
+ *
+ * \warning In case of TCP, this also sends some packets, e.g. ACK.
+ */
+void xdp_handle_msgs(struct xdp_handle_ctx *ctx, knot_layer_t *layer,
+ struct server *server, unsigned thread_id);
+
+/*!
+ * \brief Send packets thru XDP socket.
+ */
+void xdp_handle_send(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Check for old TCP connections and close/reset them.
+ */
+void xdp_handle_sweep(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Update configuration parameters of running ctx.
+ */
+void xdp_handle_reconfigure(struct xdp_handle_ctx *ctx);
+
+#endif // ENABLE_XDP