1
0
Fork 0
knot-resolver/daemon/udp_queue.c
Daniel Baumann fbc604e215
Adding upstream version 5.7.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-21 13:56:17 +02:00

146 lines
4.3 KiB
C

/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
#include "kresconfig.h"
#include "daemon/udp_queue.h"
#include "daemon/worker.h"
#include "lib/generic/array.h"
#include "lib/utils.h"
struct qr_task;
#include <sys/socket.h>
#if !ENABLE_SENDMMSG
int udp_queue_init_global(uv_loop_t *loop)
{
return 0;
}
/* Appease the linker in case this unused call isn't optimized out. */
void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
{
abort();
}
#else
/* LATER: it might be useful to have this configurable during runtime,
* but the structures below would have to change a little (broken up). */
#define UDP_QUEUE_LEN 64
/** A queue of up to UDP_QUEUE_LEN messages, meant for the same socket. */
typedef struct {
int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
struct {
struct qr_task *task; /**< Links for completion callbacks. */
struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
} items[UDP_QUEUE_LEN];
} udp_queue_t;
static udp_queue_t * udp_queue_create(void)
{
udp_queue_t *q = calloc(1, sizeof(*q));
kr_require(q != NULL);
for (int i = 0; i < UDP_QUEUE_LEN; ++i) {
struct msghdr *mhi = &q->msgvec[i].msg_hdr;
/* These shall remain always the same. */
mhi->msg_iov = q->items[i].msg_iov;
mhi->msg_iovlen = 1;
/* msg_name and msg_namelen will be per-call,
* and the rest is OK to remain zeroed all the time. */
}
return q;
}
/** Global state for udp_queue_*. Note: we never free the pointed-to memory. */
struct state {
/** Singleton map: fd -> udp_queue_t, as a simple array of pointers. */
udp_queue_t **udp_queues;
int udp_queues_len;
/** List of FD numbers that might have a non-empty queue. */
array_t(int) waiting_fds;
uv_check_t check_handle;
};
static struct state state = {0};
/** Empty the given queue. The queue is assumed to exist (but may be empty). */
static void udp_queue_send(int fd)
{
udp_queue_t *const q = state.udp_queues[fd];
if (!q->len) return;
int sent_len = sendmmsg(fd, q->msgvec, q->len, 0);
/* ATM we don't really do anything about failures. */
int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
for (int i = 0; i < q->len; ++i) {
qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
worker_task_unref(q->items[i].task);
}
q->len = 0;
}
/** Periodical callback to send all queued packets. */
static void udp_queue_check(uv_check_t *handle)
{
for (int i = 0; i < state.waiting_fds.len; ++i) {
udp_queue_send(state.waiting_fds.at[i]);
}
state.waiting_fds.len = 0;
}
int udp_queue_init_global(uv_loop_t *loop)
{
int ret = uv_check_init(loop, &state.check_handle);
if (!ret) ret = uv_check_start(&state.check_handle, udp_queue_check);
return ret;
}
void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
{
if (fd < 0) {
kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
abort();
}
worker_task_ref(task);
/* Get a valid correct queue. */
if (fd >= state.udp_queues_len) {
const int new_len = fd + 1;
state.udp_queues = realloc(state.udp_queues, // NOLINT(bugprone-suspicious-realloc-usage): we just abort() below, so it's fine
sizeof(state.udp_queues[0]) * new_len); // NOLINT(bugprone-sizeof-expression): false-positive
if (!state.udp_queues) abort();
memset(state.udp_queues + state.udp_queues_len, 0,
sizeof(state.udp_queues[0]) * (new_len - state.udp_queues_len)); // NOLINT(bugprone-sizeof-expression): false-positive
state.udp_queues_len = new_len;
}
if (unlikely(state.udp_queues[fd] == NULL))
state.udp_queues[fd] = udp_queue_create();
udp_queue_t *const q = state.udp_queues[fd];
/* Append to the queue */
struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
q->msgvec[q->len].msg_hdr.msg_name = sa;
q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
q->items[q->len].task = task;
q->items[q->len].msg_iov[0] = (struct iovec){
.iov_base = req->answer->wire,
.iov_len = req->answer->size,
};
if (q->len == 0)
array_push(state.waiting_fds, fd);
++(q->len);
if (q->len >= UDP_QUEUE_LEN) {
kr_assert(q->len == UDP_QUEUE_LEN);
udp_queue_send(fd);
/* We don't need to search state.waiting_fds;
* anyway, it's more efficient to let the hook do that. */
}
}
#endif