From f449f278dd3c70e479a035f50a9bb817a9b433ba Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 17:24:08 +0200 Subject: Adding upstream version 3.2.6. Signed-off-by: Daniel Baumann --- src/knot/server/udp-handler.c | 575 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 575 insertions(+) create mode 100644 src/knot/server/udp-handler.c (limited to 'src/knot/server/udp-handler.c') diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c new file mode 100644 index 0000000..1e309d6 --- /dev/null +++ b/src/knot/server/udp-handler.c @@ -0,0 +1,575 @@ +/* Copyright (C) 2022 CZ.NIC, z.s.p.o. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#define __APPLE_USE_RFC_3542 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_SYS_UIO_H // struct iovec (OpenBSD) +#include +#endif /* HAVE_SYS_UIO_H */ +#include + +#include "contrib/macros.h" +#include "contrib/mempattern.h" +#include "contrib/sockaddr.h" +#include "contrib/ucw/mempool.h" +#include "knot/common/fdset.h" +#include "knot/nameserver/process_query.h" +#include "knot/query/layer.h" +#include "knot/server/proxyv2.h" +#include "knot/server/server.h" +#include "knot/server/udp-handler.h" +#include "knot/server/xdp-handler.h" + +/* Buffer identifiers. */ +enum { + RX = 0, + TX = 1, + NBUFS = 2 +}; + +/*! \brief UDP context data. */ +typedef struct { + knot_layer_t layer; /*!< Query processing layer. */ + server_t *server; /*!< Name server structure. */ + unsigned thread_id; /*!< Thread identifier. */ +} udp_context_t; + +static bool udp_state_active(int state) +{ + return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL); +} + +static void udp_handle(udp_context_t *udp, int fd, struct sockaddr_storage *ss, + struct iovec *rx, struct iovec *tx, struct knot_xdp_msg *xdp_msg) +{ + /* Create query processing parameter. */ + knotd_qdata_params_t params = { + .proto = KNOTD_QUERY_PROTO_UDP, + .remote = ss, + .socket = fd, + .server = udp->server, + .xdp_msg = xdp_msg, + .thread_id = udp->thread_id + }; + struct sockaddr_storage proxied_remote; + + /* Start query processing. */ + knot_layer_begin(&udp->layer, ¶ms); + + /* Create packets. */ + knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, udp->layer.mm); + knot_pkt_t *ans = knot_pkt_new(tx->iov_base, tx->iov_len, udp->layer.mm); + + /* Input packet. */ + int ret = knot_pkt_parse(query, 0); + if (ret != KNOT_EOK && query->parsed > 0) { + ret = proxyv2_header_strip(&query, params.remote, &proxied_remote); + if (ret == KNOT_EOK) { + params.remote = &proxied_remote; + } else { + query->parsed--; // artificially decreasing "parsed" leads to FORMERR + } + } + knot_layer_consume(&udp->layer, query); + + /* Process answer. */ + while (udp_state_active(udp->layer.state)) { + knot_layer_produce(&udp->layer, ans); + } + + /* Send response only if finished successfully. */ + if (udp->layer.state == KNOT_STATE_DONE) { + tx->iov_len = ans->size; + } else { + tx->iov_len = 0; + } + + /* Reset after processing. */ + knot_layer_finish(&udp->layer); + + /* Flush per-query memory (including query and answer packets). */ + mp_flush(udp->layer.mm->ctx); +} + +typedef struct { + void* (*udp_init)(udp_context_t *, void *); + void (*udp_deinit)(void *); + int (*udp_recv)(int, void *); + void (*udp_handle)(udp_context_t *, void *); + void (*udp_send)(void *); + void (*udp_sweep)(void *); // Optional +} udp_api_t; + +/*! \brief Control message to fit IP_PKTINFO or IPv6_RECVPKTINFO. */ +typedef union { + struct cmsghdr cmsg; + uint8_t buf[CMSG_SPACE(sizeof(struct in6_pktinfo))]; +} cmsg_pktinfo_t; + +static void udp_pktinfo_handle(const struct msghdr *rx, struct msghdr *tx) +{ + tx->msg_controllen = rx->msg_controllen; + if (tx->msg_controllen > 0) { + tx->msg_control = rx->msg_control; + } else { + // BSD has problem with zero length and not-null pointer + tx->msg_control = NULL; + } + +#if defined(__linux__) || defined(__APPLE__) + struct cmsghdr *cmsg = CMSG_FIRSTHDR(tx); + if (cmsg == NULL) { + return; + } + + /* Unset the ifindex to not bypass the routing tables. */ + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { + struct in_pktinfo *info = (struct in_pktinfo *)CMSG_DATA(cmsg); + info->ipi_spec_dst = info->ipi_addr; + info->ipi_ifindex = 0; + } else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) { + struct in6_pktinfo *info = (struct in6_pktinfo *)CMSG_DATA(cmsg); + info->ipi6_ifindex = 0; + } +#endif +} + +/* UDP recvfrom() request struct. */ +struct udp_recvfrom { + int fd; + struct sockaddr_storage addr; + struct msghdr msg[NBUFS]; + struct iovec iov[NBUFS]; + uint8_t buf[NBUFS][KNOT_WIRE_MAX_PKTSIZE]; + cmsg_pktinfo_t pktinfo; +}; + +static void *udp_recvfrom_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock) +{ + struct udp_recvfrom *rq = malloc(sizeof(struct udp_recvfrom)); + if (rq == NULL) { + return NULL; + } + memset(rq, 0, sizeof(struct udp_recvfrom)); + + for (unsigned i = 0; i < NBUFS; ++i) { + rq->iov[i].iov_base = rq->buf + i; + rq->iov[i].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msg[i].msg_name = &rq->addr; + rq->msg[i].msg_namelen = sizeof(rq->addr); + rq->msg[i].msg_iov = &rq->iov[i]; + rq->msg[i].msg_iovlen = 1; + rq->msg[i].msg_control = &rq->pktinfo.cmsg; + rq->msg[i].msg_controllen = sizeof(rq->pktinfo); + } + return rq; +} + +static void udp_recvfrom_deinit(void *d) +{ + struct udp_recvfrom *rq = d; + free(rq); +} + +static int udp_recvfrom_recv(int fd, void *d) +{ + /* Reset max lengths. */ + struct udp_recvfrom *rq = (struct udp_recvfrom *)d; + rq->iov[RX].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msg[RX].msg_namelen = sizeof(struct sockaddr_storage); + rq->msg[RX].msg_controllen = sizeof(rq->pktinfo); + + int ret = recvmsg(fd, &rq->msg[RX], MSG_DONTWAIT); + if (ret > 0) { + rq->fd = fd; + rq->iov[RX].iov_len = ret; + return 1; + } + + return 0; +} + +static void udp_recvfrom_handle(udp_context_t *ctx, void *d) +{ + struct udp_recvfrom *rq = d; + + /* Prepare TX address. */ + rq->msg[TX].msg_namelen = rq->msg[RX].msg_namelen; + rq->iov[TX].iov_len = KNOT_WIRE_MAX_PKTSIZE; + + udp_pktinfo_handle(&rq->msg[RX], &rq->msg[TX]); + + /* Process received pkt. */ + udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX], NULL); +} + +static void udp_recvfrom_send(void *d) +{ + struct udp_recvfrom *rq = d; + if (rq->iov[TX].iov_len > 0) { + (void)sendmsg(rq->fd, &rq->msg[TX], 0); + } +} + +_unused_ +static udp_api_t udp_recvfrom_api = { + udp_recvfrom_init, + udp_recvfrom_deinit, + udp_recvfrom_recv, + udp_recvfrom_handle, + udp_recvfrom_send, +}; + +#ifdef ENABLE_RECVMMSG +/* UDP recvmmsg() request struct. */ +struct udp_recvmmsg { + int fd; + struct sockaddr_storage addrs[RECVMMSG_BATCHLEN]; + char *iobuf[NBUFS]; + struct iovec *iov[NBUFS]; + struct mmsghdr *msgs[NBUFS]; + unsigned rcvd; + knot_mm_t mm; + cmsg_pktinfo_t pktinfo[RECVMMSG_BATCHLEN]; +}; + +static void *udp_recvmmsg_init(_unused_ udp_context_t *ctx, _unused_ void *xdp_sock) +{ + knot_mm_t mm; + mm_ctx_mempool(&mm, sizeof(struct udp_recvmmsg)); + + struct udp_recvmmsg *rq = mm_alloc(&mm, sizeof(struct udp_recvmmsg)); + memset(rq, 0, sizeof(*rq)); + memcpy(&rq->mm, &mm, sizeof(knot_mm_t)); + + /* Initialize buffers. */ + for (unsigned i = 0; i < NBUFS; ++i) { + rq->iobuf[i] = mm_alloc(&mm, KNOT_WIRE_MAX_PKTSIZE * RECVMMSG_BATCHLEN); + rq->iov[i] = mm_alloc(&mm, sizeof(struct iovec) * RECVMMSG_BATCHLEN); + rq->msgs[i] = mm_alloc(&mm, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN); + memset(rq->msgs[i], 0, sizeof(struct mmsghdr) * RECVMMSG_BATCHLEN); + for (unsigned k = 0; k < RECVMMSG_BATCHLEN; ++k) { + rq->iov[i][k].iov_base = rq->iobuf[i] + k * KNOT_WIRE_MAX_PKTSIZE; + rq->iov[i][k].iov_len = KNOT_WIRE_MAX_PKTSIZE; + rq->msgs[i][k].msg_hdr.msg_iov = rq->iov[i] + k; + rq->msgs[i][k].msg_hdr.msg_iovlen = 1; + rq->msgs[i][k].msg_hdr.msg_name = rq->addrs + k; + rq->msgs[i][k].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + rq->msgs[i][k].msg_hdr.msg_control = &rq->pktinfo[k].cmsg; + rq->msgs[i][k].msg_hdr.msg_controllen = sizeof(cmsg_pktinfo_t); + } + } + + return rq; +} + +static void udp_recvmmsg_deinit(void *d) +{ + struct udp_recvmmsg *rq = d; + if (rq != NULL) { + mp_delete(rq->mm.ctx); + } +} + +static int udp_recvmmsg_recv(int fd, void *d) +{ + struct udp_recvmmsg *rq = d; + + int n = recvmmsg(fd, rq->msgs[RX], RECVMMSG_BATCHLEN, MSG_DONTWAIT, NULL); + if (n > 0) { + rq->fd = fd; + rq->rcvd = n; + } + return n; +} + +static void udp_recvmmsg_handle(udp_context_t *ctx, void *d) +{ + struct udp_recvmmsg *rq = d; + + /* Handle each received message. */ + unsigned j = 0; + for (unsigned i = 0; i < rq->rcvd; ++i) { + struct msghdr *rx = &rq->msgs[RX][i].msg_hdr; + struct msghdr *tx = &rq->msgs[TX][j].msg_hdr; + + /* Set received bytes. */ + rx->msg_iov->iov_len = rq->msgs[RX][i].msg_len; + /* Update mapping of address buffer. */ + tx->msg_name = rx->msg_name; + tx->msg_namelen = rx->msg_namelen; + + /* Update output message control buffer. */ + udp_pktinfo_handle(rx, tx); + + udp_handle(ctx, rq->fd, rq->addrs + i, rx->msg_iov, tx->msg_iov, NULL); + + if (tx->msg_iov->iov_len > 0) { + rq->msgs[TX][j].msg_len = tx->msg_iov->iov_len; + j++; + } else { + /* Reset tainted output context. */ + tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + } + + /* Reset input context. */ + rx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + rx->msg_namelen = sizeof(struct sockaddr_storage); + rx->msg_controllen = sizeof(cmsg_pktinfo_t); + } + rq->rcvd = j; +} + +static void udp_recvmmsg_send(void *d) +{ + struct udp_recvmmsg *rq = d; + + (void)sendmmsg(rq->fd, rq->msgs[TX], rq->rcvd, 0); + for (unsigned i = 0; i < rq->rcvd; ++i) { + struct msghdr *tx = &rq->msgs[TX][i].msg_hdr; + + /* Reset output context. */ + tx->msg_iov->iov_len = KNOT_WIRE_MAX_PKTSIZE; + } +} + +static udp_api_t udp_recvmmsg_api = { + udp_recvmmsg_init, + udp_recvmmsg_deinit, + udp_recvmmsg_recv, + udp_recvmmsg_handle, + udp_recvmmsg_send, +}; +#endif /* ENABLE_RECVMMSG */ + +#ifdef ENABLE_XDP + +static void *xdp_recvmmsg_init(udp_context_t *ctx, void *xdp_sock) +{ + return xdp_handle_init(ctx->server, xdp_sock); +} + +static void xdp_recvmmsg_deinit(void *d) +{ + if (d != NULL) { + xdp_handle_free(d); + } +} + +static int xdp_recvmmsg_recv(_unused_ int fd, void *d) +{ + return xdp_handle_recv(d); +} + +static void xdp_recvmmsg_handle(udp_context_t *ctx, void *d) +{ + xdp_handle_msgs(d, &ctx->layer, ctx->server, ctx->thread_id); +} + +static void xdp_recvmmsg_send(void *d) +{ + xdp_handle_send(d); +} + +static void xdp_recvmmsg_sweep(void *d) +{ + xdp_handle_reconfigure(d); + xdp_handle_sweep(d); +} + +static udp_api_t xdp_recvmmsg_api = { + xdp_recvmmsg_init, + xdp_recvmmsg_deinit, + xdp_recvmmsg_recv, + xdp_recvmmsg_handle, + xdp_recvmmsg_send, + xdp_recvmmsg_sweep, +}; +#endif /* ENABLE_XDP */ + +static bool is_xdp_thread(const server_t *server, int thread_id) +{ + return server->handlers[IO_XDP].size > 0 && + server->handlers[IO_XDP].handler.thread_id[0] <= thread_id; +} + +static int iface_udp_fd(const iface_t *iface, int thread_id, bool xdp_thread, + void **xdp_socket) +{ + if (xdp_thread) { +#ifdef ENABLE_XDP + if (thread_id < iface->xdp_first_thread_id || + thread_id >= iface->xdp_first_thread_id + iface->fd_xdp_count) { + return -1; // Different XDP interface. + } + size_t xdp_wrk_id = thread_id - iface->xdp_first_thread_id; + assert(xdp_wrk_id < iface->fd_xdp_count); + *xdp_socket = iface->xdp_sockets[xdp_wrk_id]; + return iface->fd_xdp[xdp_wrk_id]; +#else + assert(0); + return -1; +#endif + } else { // UDP thread. + if (iface->fd_udp_count == 0) { // No UDP interfaces. + assert(iface->fd_xdp_count > 0); + return -1; + } +#ifdef ENABLE_REUSEPORT + assert(thread_id < iface->fd_udp_count); + return iface->fd_udp[thread_id]; +#else + return iface->fd_udp[0]; +#endif + } +} + +static unsigned udp_set_ifaces(const server_t *server, size_t n_ifaces, fdset_t *fds, + int thread_id, void **xdp_socket) +{ + if (n_ifaces == 0) { + return 0; + } + + bool xdp_thread = is_xdp_thread(server, thread_id); + const iface_t *ifaces = server->ifaces; + + for (const iface_t *i = ifaces; i != ifaces + n_ifaces; i++) { + int fd = iface_udp_fd(i, thread_id, xdp_thread, xdp_socket); + if (fd < 0) { + continue; + } + int ret = fdset_add(fds, fd, FDSET_POLLIN, NULL); + if (ret < 0) { + return 0; + } + } + + assert(!xdp_thread || fdset_get_length(fds) == 1); + return fdset_get_length(fds); +} + +int udp_master(dthread_t *thread) +{ + if (thread == NULL || thread->data == NULL) { + return KNOT_EINVAL; + } + + iohandler_t *handler = (iohandler_t *)thread->data; + int thread_id = handler->thread_id[dt_get_id(thread)]; + + if (handler->server->n_ifaces == 0) { + return KNOT_EOK; + } + + /* Set thread affinity to CPU core (same for UDP and XDP). */ + unsigned cpu = dt_online_cpus(); + if (cpu > 1) { + unsigned cpu_mask = (dt_get_id(thread) % cpu); + dt_setaffinity(thread, &cpu_mask, 1); + } + + /* Choose processing API. */ + udp_api_t *api = NULL; + if (is_xdp_thread(handler->server, thread_id)) { +#ifdef ENABLE_XDP + api = &xdp_recvmmsg_api; +#else + assert(0); +#endif + } else { +#ifdef ENABLE_RECVMMSG + api = &udp_recvmmsg_api; +#else + api = &udp_recvfrom_api; +#endif + } + void *api_ctx = NULL; + + /* Create big enough memory cushion. */ + knot_mm_t mm; + mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE); + + /* Create UDP answering context. */ + udp_context_t udp = { + .server = handler->server, + .thread_id = thread_id, + }; + knot_layer_init(&udp.layer, &mm, process_query_layer()); + + /* Allocate descriptors for the configured interfaces. */ + void *xdp_socket = NULL; + size_t nifs = handler->server->n_ifaces; + fdset_t fds; + if (fdset_init(&fds, nifs) != KNOT_EOK) { + goto finish; + } + unsigned nfds = udp_set_ifaces(handler->server, nifs, &fds, + thread_id, &xdp_socket); + if (nfds == 0) { + goto finish; + } + + /* Initialize the networking API. */ + api_ctx = api->udp_init(&udp, xdp_socket); + if (api_ctx == NULL) { + goto finish; + } + + /* Loop until all data is read. */ + for (;;) { + /* Cancellation point. */ + if (dt_is_cancelled(thread)) { + break; + } + + /* Wait for events. */ + fdset_it_t it; + (void)fdset_poll(&fds, &it, 0, 1000); + + /* Process the events. */ + for (; !fdset_it_is_done(&it); fdset_it_next(&it)) { + if (!fdset_it_is_pollin(&it)) { + continue; + } + if (api->udp_recv(fdset_it_get_fd(&it), api_ctx) > 0) { + api->udp_handle(&udp, api_ctx); + api->udp_send(api_ctx); + } + } + + /* Regular maintenance (XDP-TCP only). */ + if (api->udp_sweep != NULL) { + api->udp_sweep(api_ctx); + } + } + +finish: + api->udp_deinit(api_ctx); + mp_delete(mm.ctx); + fdset_clear(&fds); + + return KNOT_EOK; +} -- cgit v1.2.3