From 830407e88f9d40d954356c3754f2647f91d5c06a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 17:26:00 +0200 Subject: Adding upstream version 5.6.0. Signed-off-by: Daniel Baumann --- daemon/io.c | 1169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1169 insertions(+) create mode 100644 daemon/io.c (limited to 'daemon/io.c') diff --git a/daemon/io.c b/daemon/io.c new file mode 100644 index 0000000..48bfed3 --- /dev/null +++ b/daemon/io.c @@ -0,0 +1,1169 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#include "daemon/io.h" + +#include +#include +#include +#include +#include + +#if ENABLE_XDP + #include + #include + #include +#endif + +#include "daemon/network.h" +#include "daemon/proxyv2.h" +#include "daemon/worker.h" +#include "daemon/tls.h" +#include "daemon/http.h" +#include "daemon/session.h" +#include "contrib/cleanup.h" +#include "lib/utils.h" + +#define negotiate_bufsize(func, handle, bufsize_want) do { \ + int bufsize = 0; (func)((handle), &bufsize); \ + if (bufsize < (bufsize_want)) { \ + bufsize = (bufsize_want); \ + (func)((handle), &bufsize); \ + } \ +} while (0) + +static void check_bufsize(uv_handle_t* handle) +{ + return; /* TODO: resurrect after https://github.com/libuv/libuv/issues/419 */ + /* We want to buffer at least N waves in advance. + * This is magic presuming we can pull in a whole recvmmsg width in one wave. + * Linux will double this the bufsize wanted. + */ + const int bufsize_want = 2 * sizeof( ((struct worker_ctx *)NULL)->wire_buf ) ; + negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want); + negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want); +} + +#undef negotiate_bufsize + +static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) +{ + /* UDP sessions use worker buffer for wire data, + * TCP sessions use session buffer for wire data + * (see session_set_handle()). + * TLS sessions use buffer from TLS context. + * The content of the worker buffer is + * guaranteed to be unchanged only for the duration of + * udp_read() and tcp_read(). + */ + struct session *s = handle->data; + if (!session_flags(s)->has_tls) { + buf->base = (char *) session_wirebuf_get_free_start(s); + buf->len = session_wirebuf_get_free_size(s); + } else { + struct tls_common_ctx *ctx = session_tls_get_common_ctx(s); + buf->base = (char *) ctx->recv_buf; + buf->len = sizeof(ctx->recv_buf); + } +} + +void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, + const struct sockaddr *comm_addr, unsigned flags) +{ + struct session *s = handle->data; + if (session_flags(s)->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC) + return; + + if (session_flags(s)->outgoing) { + const struct sockaddr *peer = session_get_peer(s); + if (kr_fails_assert(peer->sa_family != AF_UNSPEC)) + return; + if (kr_sockaddr_cmp(peer, comm_addr) != 0) { + kr_log_debug(IO, "<= ignoring UDP from unexpected address '%s'\n", + kr_straddr(comm_addr)); + return; + } + } + + const uint8_t *data = (const uint8_t *)buf->base; + ssize_t data_len = nread; + const struct sockaddr *src_addr = comm_addr; + const struct sockaddr *dst_addr = NULL; + struct proxy_result proxy; + bool has_proxy = false; + if (!session_flags(s)->outgoing && proxy_header_present(data, data_len)) { + if (!proxy_allowed(&the_worker->engine->net, comm_addr)) { + kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n", + kr_straddr(comm_addr)); + return; + } + + ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len); + if (trimmed == KNOT_EMALF) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP " + "from address '%s'\n", + kr_straddr(comm_addr)); + } + return; + } else if (trimmed < 0) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= error processing PROXYv2 UDP " + "from address '%s', ignoring\n", + kr_straddr(comm_addr)); + } + return; + } + + if (proxy.command == PROXY2_CMD_PROXY && proxy.family != AF_UNSPEC) { + has_proxy = true; + src_addr = &proxy.src_addr.ip; + dst_addr = &proxy.dst_addr.ip; + + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= UDP query from '%s'\n", + kr_straddr(src_addr)); + kr_log_debug(IO, "<= proxied through '%s'\n", + kr_straddr(comm_addr)); + } + } + data = session_wirebuf_get_free_start(s); + data_len = nread - trimmed; + } + + ssize_t consumed = session_wirebuf_consume(s, data, data_len); + kr_assert(consumed == data_len); + + struct io_comm_data comm = { + .src_addr = src_addr, + .comm_addr = comm_addr, + .dst_addr = dst_addr, + .proxy = (has_proxy) ? &proxy : NULL + }; + session_wirebuf_process(s, &comm); + session_wirebuf_discard(s); + mp_flush(the_worker->pkt_pool.ctx); +} + +static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name) +{ +#define LOG_NO_FB kr_log_error(NETWORK, "your system does not support 'freebind', " \ + "please remove it from your configuration\n") + switch (sa_family) { + case AF_INET: + *level = IPPROTO_IP; +#if defined(IP_FREEBIND) + *name = IP_FREEBIND; +#elif defined(IP_BINDANY) + *name = IP_BINDANY; +#else + LOG_NO_FB; + return kr_error(ENOTSUP); +#endif + break; + case AF_INET6: +#if defined(IP_FREEBIND) + *level = IPPROTO_IP; + *name = IP_FREEBIND; +#elif defined(IPV6_BINDANY) + *level = IPPROTO_IPV6; + *name = IPV6_BINDANY; +#else + LOG_NO_FB; + return kr_error(ENOTSUP); +#endif + break; + default: + return kr_error(ENOTSUP); + } + return kr_ok(); +} + +int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags) +{ + const int fd = socket(addr->sa_family, type, 0); + if (fd < 0) return kr_error(errno); + + int yes = 1; + if (addr->sa_family == AF_INET || addr->sa_family == AF_INET6) { + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) { + close(fd); + return kr_error(errno); + } + +#ifdef SO_REUSEPORT_LB + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &yes, sizeof(yes))) { + close(fd); + return kr_error(errno); + } +#elif defined(SO_REUSEPORT) && defined(__linux__) /* different meaning on (Free)BSD */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) { + close(fd); + return kr_error(errno); + } +#endif + +#ifdef IPV6_V6ONLY + if (addr->sa_family == AF_INET6 + && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes))) { + close(fd); + return kr_error(errno); + } +#endif + if (flags != NULL && flags->freebind) { + int optlevel; + int optname; + int ret = family_to_freebind_option(addr->sa_family, &optlevel, &optname); + if (ret) { + close(fd); + return kr_error(ret); + } + if (setsockopt(fd, optlevel, optname, &yes, sizeof(yes))) { + close(fd); + return kr_error(errno); + } + } + + /* Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets + * ignore PMTU information and send packets with DF=0. + * This mitigates DNS fragmentation attacks by preventing + * forged PMTU information. FreeBSD already has same semantics + * without setting the option. + https://gitlab.nic.cz/knot/knot-dns/-/issues/640 + */ +#if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT) + int omit = IP_PMTUDISC_OMIT; + if (type == SOCK_DGRAM && addr->sa_family == AF_INET + && setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &omit, sizeof(omit))) { + kr_log_error(IO, + "failed to disable Path MTU discovery for %s UDP: %s\n", + kr_straddr(addr), strerror(errno)); + } +#endif + } + + if (bind(fd, addr, kr_sockaddr_len(addr))) { + close(fd); + return kr_error(errno); + } + + return fd; +} + +int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd) +{ + if (!handle) { + return kr_error(EINVAL); + } + int ret = uv_udp_init(loop, handle); + if (ret) return ret; + + ret = uv_udp_open(handle, fd); + if (ret) return ret; + + uv_handle_t *h = (uv_handle_t *)handle; + check_bufsize(h); + /* Handle is already created, just create context. */ + struct session *s = session_new(h, false, false); + kr_require(s); + session_flags(s)->outgoing = false; + + int socklen = sizeof(union kr_sockaddr); + ret = uv_udp_getsockname(handle, session_get_sockname(s), &socklen); + if (ret) { + kr_log_error(IO, "ERROR: getsockname failed: %s\n", uv_strerror(ret)); + abort(); /* It might be nontrivial not to leak something here. */ + } + + return io_start_read(h); +} + +void tcp_timeout_trigger(uv_timer_t *timer) +{ + struct session *s = timer->data; + + if (kr_fails_assert(!session_flags(s)->closing)) + return; + + if (!session_tasklist_is_empty(s)) { + int finalized = session_tasklist_finalize_expired(s); + the_worker->stats.timeout += finalized; + /* session_tasklist_finalize_expired() may call worker_task_finalize(). + * If session is a source session and there were IO errors, + * worker_task_finalize() can finalize all tasks and close session. */ + if (session_flags(s)->closing) { + return; + } + + } + if (!session_tasklist_is_empty(s)) { + uv_timer_stop(timer); + session_timer_start(s, tcp_timeout_trigger, + KR_RESOLVE_TIME_LIMIT / 2, + KR_RESOLVE_TIME_LIMIT / 2); + } else { + /* Normally it should not happen, + * but better to check if there anything in this list. */ + while (!session_waitinglist_is_empty(s)) { + struct qr_task *t = session_waitinglist_pop(s, false); + worker_task_finalize(t, KR_STATE_FAIL); + worker_task_unref(t); + the_worker->stats.timeout += 1; + if (session_flags(s)->closing) { + return; + } + } + const struct network *net = &the_worker->engine->net; + uint64_t idle_in_timeout = net->tcp.in_idle_timeout; + uint64_t last_activity = session_last_activity(s); + uint64_t idle_time = kr_now() - last_activity; + if (idle_time < idle_in_timeout) { + idle_in_timeout -= idle_time; + uv_timer_stop(timer); + session_timer_start(s, tcp_timeout_trigger, + idle_in_timeout, idle_in_timeout); + } else { + struct sockaddr *peer = session_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> closing connection to '%s'\n", + peer_str ? peer_str : ""); + if (session_flags(s)->outgoing) { + worker_del_tcp_waiting(the_worker, peer); + worker_del_tcp_connected(the_worker, peer); + } + session_close(s); + } + } +} + +static void tcp_disconnect(struct session *s, int errcode) +{ + if (kr_log_is_debug(IO, NULL)) { + struct sockaddr *peer = session_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> connection to '%s' closed by peer (%s)\n", + peer_str ? peer_str : "", + uv_strerror(errcode)); + } + + if (!session_was_useful(s) && session_flags(s)->outgoing) { + /* We want to penalize the IP address, if a task is asking a query. + * It might not be the right task, but that doesn't matter so much + * for attributing the useless session to the IP address. */ + struct qr_task *t = session_tasklist_get_first(s); + struct kr_query *qry = NULL; + if (t) { + struct kr_request *req = worker_task_request(t); + qry = array_tail(req->rplan.pending); + } + if (qry) /* We reuse the error for connection, as it's quite similar. */ + qry->server_selection.error(qry, worker_task_get_transport(t), + KR_SELECTION_TCP_CONNECT_FAILED); + } + worker_end_tcp(s); +} + +static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) +{ + struct session *s = handle->data; + if (kr_fails_assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP)) + return; + + if (session_flags(s)->closing) { + return; + } + + /* nread might be 0, which does not indicate an error or EOF. + * This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */ + if (nread == 0) { + return; + } + + if (nread < 0 || !buf->base) { + tcp_disconnect(s, nread); + return; + } + + const uint8_t *data = (const uint8_t *)buf->base; + ssize_t data_len = nread; + const struct sockaddr *src_addr = session_get_peer(s); + const struct sockaddr *dst_addr = NULL; + if (!session_flags(s)->outgoing && !session_flags(s)->no_proxy && + proxy_header_present(data, data_len)) { + if (!proxy_allowed(&the_worker->engine->net, src_addr)) { + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed " + "for this peer, close\n", + kr_straddr(src_addr)); + } + worker_end_tcp(s); + return; + } + + struct proxy_result *proxy = session_proxy_create(s); + ssize_t trimmed = proxy_process_header(proxy, s, data, data_len); + if (trimmed < 0) { + if (kr_log_is_debug(IO, NULL)) { + if (trimmed == KNOT_EMALF) { + kr_log_debug(IO, "<= connection to '%s': " + "malformed PROXYv2 header, close\n", + kr_straddr(src_addr)); + } else { + kr_log_debug(IO, "<= connection to '%s': " + "error processing PROXYv2 header, close\n", + kr_straddr(src_addr)); + } + } + worker_end_tcp(s); + return; + } else if (trimmed == 0) { + return; + } + + if (proxy->command != PROXY2_CMD_LOCAL && proxy->family != AF_UNSPEC) { + src_addr = &proxy->src_addr.ip; + dst_addr = &proxy->dst_addr.ip; + + if (kr_log_is_debug(IO, NULL)) { + kr_log_debug(IO, "<= TCP stream from '%s'\n", + kr_straddr(src_addr)); + kr_log_debug(IO, "<= proxied through '%s'\n", + kr_straddr(session_get_peer(s))); + } + } + + data = session_wirebuf_get_free_start(s); + data_len = nread - trimmed; + } + + session_flags(s)->no_proxy = true; + + ssize_t consumed = 0; + if (session_flags(s)->has_tls) { + /* buf->base points to start of the tls receive buffer. + Decode data free space in session wire buffer. */ + consumed = tls_process_input_data(s, data, data_len); + if (consumed < 0) { + if (kr_log_is_debug(IO, NULL)) { + char *peer_str = kr_straddr(src_addr); + kr_log_debug(IO, "=> connection to '%s': " + "error processing TLS data, close\n", + peer_str ? peer_str : ""); + } + worker_end_tcp(s); + return; + } else if (consumed == 0) { + return; + } + data = session_wirebuf_get_free_start(s); + data_len = consumed; + } +#if ENABLE_DOH2 + int streaming = 1; + if (session_flags(s)->has_http) { + streaming = http_process_input_data(s, data, data_len, + &consumed); + if (streaming < 0) { + if (kr_log_is_debug(IO, NULL)) { + char *peer_str = kr_straddr(src_addr); + kr_log_debug(IO, "=> connection to '%s': " + "error processing HTTP data, close\n", + peer_str ? peer_str : ""); + } + worker_end_tcp(s); + return; + } + if (consumed == 0) { + return; + } + data = session_wirebuf_get_free_start(s); + data_len = consumed; + } +#endif + + /* data points to start of the free space in session wire buffer. + Simple increase internal counter. */ + consumed = session_wirebuf_consume(s, data, data_len); + kr_assert(consumed == data_len); + + struct io_comm_data comm = { + .src_addr = src_addr, + .comm_addr = session_get_peer(s), + .dst_addr = dst_addr, + .proxy = session_proxy_get(s) + }; + int ret = session_wirebuf_process(s, &comm); + if (ret < 0) { + /* An error has occurred, close the session. */ + worker_end_tcp(s); + } + session_wirebuf_compress(s); + mp_flush(the_worker->pkt_pool.ctx); +#if ENABLE_DOH2 + if (session_flags(s)->has_http && streaming == 0 && ret == 0) { + ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST); + if (ret) { + /* An error has occurred, close the session. */ + worker_end_tcp(s); + } + } +#endif +} + +#if ENABLE_DOH2 +static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session) +{ + struct tls_ctx *ctx = session_tls_get_server_ctx(session); + ssize_t sent = 0; + kr_require(ctx); + + sent = gnutls_record_send(ctx->c.tls_session, buf, len); + if (sent < 0) { + kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n", + gnutls_strerror_name(sent), sent); + return kr_error(EIO); + } + return sent; +} +#endif + +static void _tcp_accept(uv_stream_t *master, int status, bool tls, bool http) +{ + if (status != 0) { + return; + } + + struct worker_ctx *worker = the_worker; + uv_tcp_t *client = malloc(sizeof(uv_tcp_t)); + if (!client) { + return; + } + int res = io_create(master->loop, (uv_handle_t *)client, + SOCK_STREAM, AF_UNSPEC, tls, http); + if (res) { + if (res == UV_EMFILE) { + worker->too_many_open = true; + worker->rconcurrent_highwatermark = worker->stats.rconcurrent; + } + /* Since res isn't OK struct session wasn't allocated \ borrowed. + * We must release client handle only. + */ + free(client); + return; + } + + /* struct session was allocated \ borrowed from memory pool. */ + struct session *s = client->data; + kr_require(session_flags(s)->outgoing == false); + kr_require(session_flags(s)->has_tls == tls); + + if (uv_accept(master, (uv_stream_t *)client) != 0) { + /* close session, close underlying uv handles and + * deallocate (or return to memory pool) memory. */ + session_close(s); + return; + } + + /* Get peer's and our address. We apparently get specific sockname here + * even if we listened on a wildcard address. */ + struct sockaddr *sa = session_get_peer(s); + int sa_len = sizeof(struct sockaddr_in6); + int ret = uv_tcp_getpeername(client, sa, &sa_len); + if (ret || sa->sa_family == AF_UNSPEC) { + session_close(s); + return; + } + sa = session_get_sockname(s); + sa_len = sizeof(struct sockaddr_in6); + ret = uv_tcp_getsockname(client, sa, &sa_len); + if (ret || sa->sa_family == AF_UNSPEC) { + session_close(s); + return; + } + + /* Set deadlines for TCP connection and start reading. + * It will re-check every half of a request time limit if the connection + * is idle and should be terminated, this is an educated guess. */ + + const struct network *net = &worker->engine->net; + uint64_t idle_in_timeout = net->tcp.in_idle_timeout; + + uint64_t timeout = KR_CONN_RTT_MAX / 2; + if (tls) { + timeout += TLS_MAX_HANDSHAKE_TIME; + struct tls_ctx *ctx = session_tls_get_server_ctx(s); + if (!ctx) { + ctx = tls_new(worker); + if (!ctx) { + session_close(s); + return; + } + ctx->c.session = s; + ctx->c.handshake_state = TLS_HS_IN_PROGRESS; + + /* Configure ALPN. */ + gnutls_datum_t proto; + if (!http) { + proto.data = (unsigned char *)"dot"; + proto.size = 3; + } else { + proto.data = (unsigned char *)"h2"; + proto.size = 2; + } + unsigned int flags = 0; +#if GNUTLS_VERSION_NUMBER >= 0x030500 + /* Mandatory ALPN means the protocol must match if and + * only if ALPN extension is used by the client. */ + flags |= GNUTLS_ALPN_MANDATORY; +#endif + ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags); + if (ret != GNUTLS_E_SUCCESS) { + session_close(s); + return; + } + + session_tls_set_server_ctx(s, ctx); + } + } +#if ENABLE_DOH2 + if (http) { + struct http_ctx *ctx = session_http_get_server_ctx(s); + if (!ctx) { + if (!tls) { /* Plain HTTP is not supported. */ + session_close(s); + return; + } + ctx = http_new(s, tls_send); + if (!ctx) { + session_close(s); + return; + } + session_http_set_server_ctx(s, ctx); + } + } +#endif + session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout); + io_start_read((uv_handle_t *)client); +} + +static void tcp_accept(uv_stream_t *master, int status) +{ + _tcp_accept(master, status, false, false); +} + +static void tls_accept(uv_stream_t *master, int status) +{ + _tcp_accept(master, status, true, false); +} + +#if ENABLE_DOH2 +static void https_accept(uv_stream_t *master, int status) +{ + _tcp_accept(master, status, true, true); +} +#endif + +int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls, bool has_http) +{ + uv_connection_cb connection; + + if (!handle) { + return kr_error(EINVAL); + } + int ret = uv_tcp_init(loop, handle); + if (ret) return ret; + + if (has_tls && has_http) { +#if ENABLE_DOH2 + connection = https_accept; +#else + kr_log_error(IO, "kresd was compiled without libnghttp2 support\n"); + return kr_error(ENOPROTOOPT); +#endif + } else if (has_tls) { + connection = tls_accept; + } else if (has_http) { + return kr_error(EPROTONOSUPPORT); + } else { + connection = tcp_accept; + } + + ret = uv_tcp_open(handle, (uv_os_sock_t) fd); + if (ret) return ret; + + int val; (void)val; + /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */ +#ifdef TCP_DEFER_ACCEPT + val = KR_CONN_RTT_MAX/1000; + if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val))) { + kr_log_error(IO, "listen TCP (defer_accept): %s\n", strerror(errno)); + } +#endif + + ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection); + if (ret != 0) { + return ret; + } + + /* TCP_FASTOPEN enables 1 RTT connection resumptions. */ +#ifdef TCP_FASTOPEN + #ifdef __linux__ + val = 16; /* Accepts queue length hint */ + #else + val = 1; /* Accepts on/off */ + #endif + if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &val, sizeof(val))) { + kr_log_error(IO, "listen TCP (fastopen): %s%s\n", strerror(errno), + (errno != EPERM ? "" : + ". This may be caused by TCP Fast Open being disabled in the OS.")); + } +#endif + + handle->data = NULL; + return 0; +} + + +enum io_stream_mode { + io_mode_text = 0, + io_mode_binary = 1, +}; + +struct io_stream_data { + enum io_stream_mode mode; + size_t blen; ///< length of `buf` + char *buf; ///< growing buffer residing on `pool` (mp_append_*) + knot_mm_t *pool; +}; + +/** + * TTY control: process input and free() the buffer. + * + * For parameters see http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb + * + * - This is just basic read-eval-print; libedit is supported through kresc; + */ +void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) +{ + auto_free char *commands = buf ? buf->base : NULL; + + /* Set output streams */ + FILE *out = stdout; + uv_os_fd_t stream_fd = -1; + struct args *args = the_args; + struct io_stream_data *data = (struct io_stream_data*) stream->data; + if (nread < 0 || uv_fileno((uv_handle_t *)stream, &stream_fd)) { + mp_delete(data->pool->ctx); + uv_close((uv_handle_t *)stream, (uv_close_cb) free); + return; + } + if (nread <= 0) { + return; + } + if (stream_fd != STDIN_FILENO) { + uv_os_fd_t dup_fd = dup(stream_fd); + if (dup_fd >= 0) { + out = fdopen(dup_fd, "w"); + } + } + + /** The current single command and the remaining command(s). */ + char *cmd, *cmd_next = NULL; + bool incomplete_cmd = false; + + if (!commands || nread <= 0) { + goto finish; + } + + /* Execute */ + if (commands[nread - 1] != '\n') { + incomplete_cmd = true; + } + /* Ensure commands is 0-terminated */ + if (nread >= buf->len) { /* only equality should be possible */ + char *newbuf = realloc(commands, nread + 1); + if (!newbuf) + goto finish; + commands = newbuf; + } + commands[nread] = '\0'; + + char *boundary = "\n\0"; + cmd = strtok(commands, "\n"); + /* strtok skip '\n' but we need process alone '\n' too */ + if (commands[0] == '\n') { + cmd_next = cmd; + cmd = boundary; + } else { + cmd_next = strtok(NULL, "\n"); + } + + /** Moving pointer to end of buffer with incomplete command. */ + char *pbuf = data->buf + data->blen; + lua_State *L = the_worker->engine->L; + while (cmd != NULL) { + /* Last command is incomplete - save it and execute later */ + if (incomplete_cmd && cmd_next == NULL) { + pbuf = mp_append_string(data->pool->ctx, pbuf, cmd); + mp_append_char(data->pool->ctx, pbuf, '\0'); + data->buf = mp_ptr(data->pool->ctx); + data->blen = data->blen + strlen(cmd); + + /* There is new incomplete command */ + if (commands[nread - 1] == '\n') + incomplete_cmd = false; + goto next_iter; + } + + /* Process incomplete command from previously call */ + if (data->blen > 0) { + if (commands[0] != '\n' && commands[0] != '\0') { + pbuf = mp_append_string(data->pool->ctx, pbuf, cmd); + mp_append_char(data->pool->ctx, pbuf, '\0'); + data->buf = mp_ptr(data->pool->ctx); + cmd = data->buf; + } else { + cmd = data->buf; + } + data->blen = 0; + pbuf = data->buf; + } + + /* Pseudo-command for switching to "binary output"; */ + if (strcmp(cmd, "__binary") == 0) { + data->mode = io_mode_binary; + goto next_iter; + } + + const bool cmd_failed = engine_cmd(L, cmd, false); + const char *message = NULL; + size_t len_s; + if (lua_gettop(L) > 0) { + message = lua_tolstring(L, -1, &len_s); + } + + /* Send back the output, either in "binary" or normal mode. */ + if (data->mode == io_mode_binary) { + /* Leader expects length field in all cases */ + if (!message || len_s > UINT32_MAX) { + kr_log_error(IO, "unrepresentable response on control socket, " + "sending back empty block (command '%s')\n", cmd); + len_s = 0; + } + uint32_t len_n = htonl(len_s); + fwrite(&len_n, sizeof(len_n), 1, out); + if (len_s > 0) + fwrite(message, len_s, 1, out); + } else { + if (message) + fprintf(out, "%s", message); + if (message || !args->quiet) + fprintf(out, "\n"); + if (!args->quiet) + fprintf(out, "> "); + } + + /* Duplicate command and output to logs */ + if (cmd_failed) { + kr_log_warning(CONTROL, "> %s\n", cmd); + if (message) + kr_log_warning(CONTROL, "%s\n", message); + } else { + kr_log_debug(CONTROL, "> %s\n", cmd); + if (message) + kr_log_debug(CONTROL, "%s\n", message); + } + next_iter: + lua_settop(L, 0); /* not required in some cases but harmless */ + cmd = cmd_next; + cmd_next = strtok(NULL, "\n"); + } + +finish: + /* Close if redirected */ + if (stream_fd != STDIN_FILENO) { + fclose(out); + } +} + +void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) +{ + buf->len = suggested; + buf->base = malloc(suggested); +} + +struct io_stream_data *io_tty_alloc_data() { + knot_mm_t *pool = mm_ctx_mempool2(MM_DEFAULT_BLKSIZE); + if (!pool) { + return NULL; + } + struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data)); + + data->buf = mp_start(pool->ctx, 512); + data->mode = io_mode_text; + data->blen = 0; + data->pool = pool; + + return data; +} + +void io_tty_accept(uv_stream_t *master, int status) +{ + /* We can't use any allocations after mp_start() and it's easier anyway. */ + uv_pipe_t *client = malloc(sizeof(*client)); + if (!client) + return; + + struct io_stream_data *data = io_tty_alloc_data(); + if (!data) { + free(client); + return; + } + client->data = data; + + struct args *args = the_args; + uv_pipe_init(master->loop, client, 0); + if (uv_accept(master, (uv_stream_t *)client) != 0) { + mp_delete(data->pool->ctx); + return; + } + uv_read_start((uv_stream_t *)client, io_tty_alloc, io_tty_process_input); + + /* Write command line */ + if (!args->quiet) { + uv_buf_t buf = { "> ", 2 }; + uv_try_write((uv_stream_t *)client, &buf, 1); + } +} + +int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd) +{ + if (!handle) { + return kr_error(EINVAL); + } + int ret = uv_pipe_init(loop, handle, 0); + if (ret) return ret; + + ret = uv_pipe_open(handle, fd); + if (ret) return ret; + + ret = uv_listen((uv_stream_t *)handle, 16, io_tty_accept); + if (ret) return ret; + + handle->data = NULL; + + return 0; +} + +#if ENABLE_XDP +static void xdp_rx(uv_poll_t* handle, int status, int events) +{ + const int XDP_RX_BATCH_SIZE = 64; + if (status < 0) { + kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status)); + return; + } + if (events != UV_READABLE) { + kr_log_error(XDP, "poll unexpected events: %d\n", events); + return; + } + + xdp_handle_data_t *xhd = handle->data; + kr_require(xhd && xhd->session && xhd->socket); + uint32_t rcvd; + knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE]; + int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd + #if KNOT_VERSION_HEX >= 0x030100 + , NULL + #endif + ); + + if (kr_fails_assert(ret == KNOT_EOK)) { + /* ATM other error codes can only be returned when called incorrectly */ + kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret)); + return; + } + kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd); + kr_require(rcvd <= XDP_RX_BATCH_SIZE); + for (int i = 0; i < rcvd; ++i) { + const knot_xdp_msg_t *msg = &msgs[i]; + kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE); + knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len, + &the_worker->pkt_pool); + if (kpkt == NULL) { + ret = kr_error(ENOMEM); + } else { + struct io_comm_data comm = { + .src_addr = (const struct sockaddr *)&msg->ip_from, + .comm_addr = (const struct sockaddr *)&msg->ip_from, + .dst_addr = (const struct sockaddr *)&msg->ip_to + }; + ret = worker_submit(xhd->session, &comm, + msg->eth_from, msg->eth_to, kpkt); + } + if (ret) + kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret)); + mp_flush(the_worker->pkt_pool.ctx); + } + knot_xdp_recv_finish(xhd->socket, msgs, rcvd); +} +/// Warn if the XDP program is running in emulated mode (XDP_SKB) +static void xdp_warn_mode(const char *ifname) +{ + if (kr_fails_assert(ifname)) + return; + + const unsigned if_index = if_nametoindex(ifname); + if (!if_index) { + kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n", + ifname, strerror(errno)); + return; + } + + const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index); + switch (mode) { + case KNOT_XDP_MODE_FULL: + return; + case KNOT_XDP_MODE_EMUL: + kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n", + ifname); + return; + case KNOT_XDP_MODE_NONE: // enum warnings from compiler + break; + } + kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n", + ifname, (int)mode); +} +int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname) +{ + if (!ep || !ep->handle) { + return kr_error(EINVAL); + } + + // RLIMIT_MEMLOCK often needs raising when operating on BPF + static int ret_limit = 1; + if (ret_limit == 1) { + struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY }; + ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit) + ? kr_error(errno) : 0; + } + if (ret_limit) return ret_limit; + + xdp_handle_data_t *xhd = malloc(sizeof(*xhd)); + if (!xhd) return kr_error(ENOMEM); + + xhd->socket = NULL; // needed for some reason + + // This call is a libknot version hell, unfortunately. + int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, + #if KNOT_VERSION_HEX < 0x030100 + ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL, + KNOT_XDP_LOAD_BPF_MAYBE + #elif KNOT_VERSION_HEX < 0x030200 + ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0), + KNOT_XDP_LOAD_BPF_MAYBE + #else + KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS), + ep->port, 0/*quic_port*/, + KNOT_XDP_LOAD_BPF_MAYBE, + NULL/*xdp_config*/ + #endif + ); + + if (!ret) xdp_warn_mode(ifname); + + if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker); + if (ret || kr_fails_assert(xhd->socket)) { + free(xhd); + return ret == 0 ? kr_error(EINVAL) : kr_error(ret); + } + xhd->tx_waker.data = xhd->socket; + + ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful + ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd); + if (ret) { + knot_xdp_deinit(xhd->socket); + free(xhd); + return kr_error(ret); + } + + // beware: this sets poll_handle->data + xhd->session = session_new(ep->handle, false, false); + kr_require(!session_flags(xhd->session)->outgoing); + session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there + + ep->handle->data = xhd; + ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx); + return ret; +} +#endif + + +int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http) +{ + int ret = -1; + if (type == SOCK_DGRAM) { + ret = uv_udp_init(loop, (uv_udp_t *)handle); + } else if (type == SOCK_STREAM) { + ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family); + uv_tcp_nodelay((uv_tcp_t *)handle, 1); + } + if (ret != 0) { + return ret; + } + struct session *s = session_new(handle, has_tls, has_http); + if (s == NULL) { + ret = -1; + } + return ret; +} + +static void io_deinit(uv_handle_t *handle) +{ + if (!handle || !handle->data) { + return; + } + if (handle->type != UV_POLL) { + session_free(handle->data); + } else { + #if ENABLE_XDP + xdp_handle_data_t *xhd = handle->data; + uv_idle_stop(&xhd->tx_waker); + uv_close((uv_handle_t *)&xhd->tx_waker, NULL); + session_free(xhd->session); + knot_xdp_deinit(xhd->socket); + free(xhd); + #else + kr_assert(false); + #endif + } +} + +void io_free(uv_handle_t *handle) +{ + io_deinit(handle); + free(handle); +} + +int io_start_read(uv_handle_t *handle) +{ + switch (handle->type) { + case UV_UDP: + return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv); + case UV_TCP: + return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv); + default: + kr_assert(false); + return kr_error(EINVAL); + } +} + +int io_stop_read(uv_handle_t *handle) +{ + if (handle->type == UV_UDP) { + return uv_udp_recv_stop((uv_udp_t *)handle); + } else { + return uv_read_stop((uv_stream_t *)handle); + } +} -- cgit v1.2.3