1180 lines
32 KiB
C
1180 lines
32 KiB
C
/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
|
|
* SPDX-License-Identifier: GPL-3.0-or-later
|
|
*/
|
|
|
|
#include "daemon/io.h"
|
|
|
|
#include <contrib/ucw/lib.h>
|
|
#include <contrib/ucw/mempool.h>
|
|
#include <libknot/errcode.h>
|
|
#include <string.h>
|
|
#include <sys/resource.h>
|
|
|
|
#if ENABLE_XDP
|
|
#include <libknot/xdp/eth.h>
|
|
#include <libknot/xdp/xdp.h>
|
|
#include <net/if.h>
|
|
#endif
|
|
|
|
#include "daemon/network.h"
|
|
#include "daemon/proxyv2.h"
|
|
#include "daemon/worker.h"
|
|
#include "daemon/tls.h"
|
|
#include "daemon/http.h"
|
|
#include "daemon/session.h"
|
|
#include "contrib/cleanup.h"
|
|
#include "lib/utils.h"
|
|
|
|
#define negotiate_bufsize(func, handle, bufsize_want) do { \
|
|
int bufsize = 0; (func)((handle), &bufsize); \
|
|
if (bufsize < (bufsize_want)) { \
|
|
bufsize = (bufsize_want); \
|
|
(func)((handle), &bufsize); \
|
|
} \
|
|
} while (0)
|
|
|
|
static void check_bufsize(uv_handle_t* handle)
|
|
{
|
|
return; /* TODO: resurrect after https://github.com/libuv/libuv/issues/419 */
|
|
/* We want to buffer at least N waves in advance.
|
|
* This is magic presuming we can pull in a whole recvmmsg width in one wave.
|
|
* Linux will double this the bufsize wanted.
|
|
*/
|
|
const int bufsize_want = 2 * sizeof( ((struct worker_ctx *)NULL)->wire_buf ) ;
|
|
negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
|
|
negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
|
|
}
|
|
|
|
#undef negotiate_bufsize
|
|
|
|
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
|
|
{
|
|
/* UDP sessions use worker buffer for wire data,
|
|
* TCP sessions use session buffer for wire data
|
|
* (see session_set_handle()).
|
|
* TLS sessions use buffer from TLS context.
|
|
* The content of the worker buffer is
|
|
* guaranteed to be unchanged only for the duration of
|
|
* udp_read() and tcp_read().
|
|
*/
|
|
struct session *s = handle->data;
|
|
if (!session_flags(s)->has_tls) {
|
|
buf->base = (char *) session_wirebuf_get_free_start(s);
|
|
buf->len = session_wirebuf_get_free_size(s);
|
|
} else {
|
|
struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
|
|
buf->base = (char *) ctx->recv_buf;
|
|
buf->len = sizeof(ctx->recv_buf);
|
|
}
|
|
}
|
|
|
|
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
|
|
const struct sockaddr *comm_addr, unsigned flags)
|
|
{
|
|
struct session *s = handle->data;
|
|
if (session_flags(s)->closing || nread <= 0 || comm_addr->sa_family == AF_UNSPEC)
|
|
return;
|
|
|
|
if (session_flags(s)->outgoing) {
|
|
const struct sockaddr *peer = session_get_peer(s);
|
|
if (kr_fails_assert(peer->sa_family != AF_UNSPEC))
|
|
return;
|
|
if (kr_sockaddr_cmp(peer, comm_addr) != 0) {
|
|
kr_log_debug(IO, "<= ignoring UDP from unexpected address '%s'\n",
|
|
kr_straddr(comm_addr));
|
|
return;
|
|
}
|
|
}
|
|
|
|
const uint8_t *data = (const uint8_t *)buf->base;
|
|
ssize_t data_len = nread;
|
|
const struct sockaddr *src_addr = comm_addr;
|
|
const struct sockaddr *dst_addr = NULL;
|
|
struct proxy_result proxy;
|
|
bool has_proxy = false;
|
|
if (!session_flags(s)->outgoing && proxy_header_present(data, data_len)) {
|
|
if (!proxy_allowed(&the_worker->engine->net, comm_addr)) {
|
|
kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
|
|
kr_straddr(comm_addr));
|
|
return;
|
|
}
|
|
|
|
ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len);
|
|
if (trimmed == KNOT_EMALF) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
kr_log_debug(IO, "<= ignoring malformed PROXYv2 UDP "
|
|
"from address '%s'\n",
|
|
kr_straddr(comm_addr));
|
|
}
|
|
return;
|
|
} else if (trimmed < 0) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
kr_log_debug(IO, "<= error processing PROXYv2 UDP "
|
|
"from address '%s', ignoring\n",
|
|
kr_straddr(comm_addr));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (proxy.command == PROXY2_CMD_PROXY && proxy.family != AF_UNSPEC) {
|
|
has_proxy = true;
|
|
src_addr = &proxy.src_addr.ip;
|
|
dst_addr = &proxy.dst_addr.ip;
|
|
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
kr_log_debug(IO, "<= UDP query from '%s'\n",
|
|
kr_straddr(src_addr));
|
|
kr_log_debug(IO, "<= proxied through '%s'\n",
|
|
kr_straddr(comm_addr));
|
|
}
|
|
}
|
|
data = session_wirebuf_get_free_start(s);
|
|
data_len = nread - trimmed;
|
|
}
|
|
|
|
ssize_t consumed = session_wirebuf_consume(s, data, data_len);
|
|
kr_assert(consumed == data_len);
|
|
|
|
struct io_comm_data comm = {
|
|
.src_addr = src_addr,
|
|
.comm_addr = comm_addr,
|
|
.dst_addr = dst_addr,
|
|
.proxy = (has_proxy) ? &proxy : NULL
|
|
};
|
|
session_wirebuf_process(s, &comm);
|
|
session_wirebuf_discard(s);
|
|
mp_flush(the_worker->pkt_pool.ctx);
|
|
}
|
|
|
|
static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name)
|
|
{
|
|
#define LOG_NO_FB kr_log_error(NETWORK, "your system does not support 'freebind', " \
|
|
"please remove it from your configuration\n")
|
|
switch (sa_family) {
|
|
case AF_INET: // NOLINT(bugprone-branch-clone): The branches are only cloned for specific macro configs
|
|
*level = IPPROTO_IP;
|
|
#if defined(IP_FREEBIND)
|
|
*name = IP_FREEBIND;
|
|
#elif defined(IP_BINDANY)
|
|
*name = IP_BINDANY;
|
|
#else
|
|
LOG_NO_FB;
|
|
return kr_error(ENOTSUP);
|
|
#endif
|
|
break;
|
|
case AF_INET6:
|
|
#if defined(IP_FREEBIND)
|
|
*level = IPPROTO_IP;
|
|
*name = IP_FREEBIND;
|
|
#elif defined(IPV6_BINDANY)
|
|
*level = IPPROTO_IPV6;
|
|
*name = IPV6_BINDANY;
|
|
#else
|
|
LOG_NO_FB;
|
|
return kr_error(ENOTSUP);
|
|
#endif
|
|
break;
|
|
default:
|
|
return kr_error(ENOTSUP);
|
|
}
|
|
return kr_ok();
|
|
}
|
|
|
|
int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags)
|
|
{
|
|
const int fd = socket(addr->sa_family, type, 0);
|
|
if (fd < 0) return kr_error(errno);
|
|
|
|
int yes = 1;
|
|
if (addr->sa_family == AF_INET || addr->sa_family == AF_INET6) {
|
|
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
|
|
#ifdef SO_REUSEPORT_LB
|
|
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &yes, sizeof(yes))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
#elif defined(SO_REUSEPORT) && defined(__linux__) /* different meaning on (Free)BSD */
|
|
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
#endif
|
|
|
|
#ifdef IPV6_V6ONLY
|
|
if (addr->sa_family == AF_INET6
|
|
&& setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
#endif
|
|
if (flags != NULL && flags->freebind) {
|
|
int optlevel;
|
|
int optname;
|
|
int ret = family_to_freebind_option(addr->sa_family, &optlevel, &optname);
|
|
if (ret) {
|
|
close(fd);
|
|
return kr_error(ret);
|
|
}
|
|
if (setsockopt(fd, optlevel, optname, &yes, sizeof(yes))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
}
|
|
|
|
/* Linux 3.15 has IP_PMTUDISC_OMIT which makes sockets
|
|
* ignore PMTU information and send packets with DF=0.
|
|
* This mitigates DNS fragmentation attacks by preventing
|
|
* forged PMTU information. FreeBSD already has same semantics
|
|
* without setting the option.
|
|
https://gitlab.nic.cz/knot/knot-dns/-/issues/640
|
|
*/
|
|
#if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
|
|
int omit = IP_PMTUDISC_OMIT;
|
|
if (type == SOCK_DGRAM && addr->sa_family == AF_INET
|
|
&& setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER, &omit, sizeof(omit))) {
|
|
kr_log_error(IO,
|
|
"failed to disable Path MTU discovery for %s UDP: %s\n",
|
|
kr_straddr(addr), strerror(errno));
|
|
}
|
|
#endif
|
|
}
|
|
|
|
if (bind(fd, addr, kr_sockaddr_len(addr))) {
|
|
close(fd);
|
|
return kr_error(errno);
|
|
}
|
|
|
|
return fd;
|
|
}
|
|
|
|
/// Optionally set a socket option and log error on failure.
|
|
static void set_so(int fd, int so_option, int value, const char *descr)
|
|
{
|
|
if (!value) return;
|
|
if (setsockopt(fd, SOL_SOCKET, so_option, &value, sizeof(value))) {
|
|
kr_log_error(IO, "failed to set %s to %d: %s\n",
|
|
descr, value, strerror(errno));
|
|
// we treat this as non-critical failure
|
|
}
|
|
}
|
|
|
|
int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd)
|
|
{
|
|
if (!handle) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
int ret = uv_udp_init(loop, handle);
|
|
if (ret) return ret;
|
|
|
|
ret = uv_udp_open(handle, fd);
|
|
if (ret) return ret;
|
|
|
|
struct network *net = &the_worker->engine->net;
|
|
set_so(fd, SO_SNDBUF, net->listen_udp_buflens.snd, "UDP send buffer size");
|
|
set_so(fd, SO_RCVBUF, net->listen_udp_buflens.rcv, "UDP receive buffer size");
|
|
|
|
uv_handle_t *h = (uv_handle_t *)handle;
|
|
check_bufsize(h);
|
|
/* Handle is already created, just create context. */
|
|
struct session *s = session_new(h, false, false);
|
|
kr_require(s);
|
|
session_flags(s)->outgoing = false;
|
|
|
|
int socklen = sizeof(union kr_sockaddr);
|
|
ret = uv_udp_getsockname(handle, session_get_sockname(s), &socklen);
|
|
if (ret) {
|
|
kr_log_error(IO, "ERROR: getsockname failed: %s\n", uv_strerror(ret));
|
|
abort(); /* It might be nontrivial not to leak something here. */
|
|
}
|
|
|
|
return io_start_read(h);
|
|
}
|
|
|
|
void tcp_timeout_trigger(uv_timer_t *timer)
|
|
{
|
|
struct session *s = timer->data;
|
|
|
|
if (kr_fails_assert(!session_flags(s)->closing))
|
|
return;
|
|
|
|
if (!session_tasklist_is_empty(s)) {
|
|
int finalized = session_tasklist_finalize_expired(s);
|
|
the_worker->stats.timeout += finalized;
|
|
/* session_tasklist_finalize_expired() may call worker_task_finalize().
|
|
* If session is a source session and there were IO errors,
|
|
* worker_task_finalize() can finalize all tasks and close session. */
|
|
if (session_flags(s)->closing) {
|
|
return;
|
|
}
|
|
|
|
}
|
|
if (!session_tasklist_is_empty(s)) {
|
|
uv_timer_stop(timer);
|
|
session_timer_start(s, tcp_timeout_trigger,
|
|
KR_RESOLVE_TIME_LIMIT / 2,
|
|
KR_RESOLVE_TIME_LIMIT / 2);
|
|
} else {
|
|
/* Normally it should not happen,
|
|
* but better to check if there anything in this list. */
|
|
while (!session_waitinglist_is_empty(s)) {
|
|
struct qr_task *t = session_waitinglist_pop(s, false);
|
|
worker_task_finalize(t, KR_STATE_FAIL);
|
|
worker_task_unref(t);
|
|
the_worker->stats.timeout += 1;
|
|
if (session_flags(s)->closing) {
|
|
return;
|
|
}
|
|
}
|
|
const struct network *net = &the_worker->engine->net;
|
|
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
|
|
uint64_t last_activity = session_last_activity(s);
|
|
uint64_t idle_time = kr_now() - last_activity;
|
|
if (idle_time < idle_in_timeout) {
|
|
idle_in_timeout -= idle_time;
|
|
uv_timer_stop(timer);
|
|
session_timer_start(s, tcp_timeout_trigger,
|
|
idle_in_timeout, idle_in_timeout);
|
|
} else {
|
|
struct sockaddr *peer = session_get_peer(s);
|
|
char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(IO, "=> closing connection to '%s'\n",
|
|
peer_str ? peer_str : "");
|
|
if (session_flags(s)->outgoing) {
|
|
worker_del_tcp_waiting(the_worker, peer);
|
|
worker_del_tcp_connected(the_worker, peer);
|
|
}
|
|
session_close(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
|
|
{
|
|
struct session *s = handle->data;
|
|
if (kr_fails_assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP))
|
|
return;
|
|
|
|
if (session_flags(s)->closing) {
|
|
return;
|
|
}
|
|
|
|
/* nread might be 0, which does not indicate an error or EOF.
|
|
* This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
|
|
if (nread == 0) {
|
|
return;
|
|
}
|
|
|
|
if (nread < 0 || !buf->base) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
struct sockaddr *peer = session_get_peer(s);
|
|
char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(IO, "=> connection to '%s' closed by peer (%s)\n",
|
|
peer_str ? peer_str : "",
|
|
uv_strerror(nread));
|
|
}
|
|
|
|
session_tcp_penalize(s);
|
|
worker_end_tcp(s);
|
|
return;
|
|
}
|
|
|
|
const uint8_t *data = (const uint8_t *)buf->base;
|
|
ssize_t data_len = nread;
|
|
const struct sockaddr *src_addr = session_get_peer(s);
|
|
const struct sockaddr *dst_addr = NULL;
|
|
if (!session_flags(s)->outgoing && !session_flags(s)->no_proxy &&
|
|
proxy_header_present(data, data_len)) {
|
|
if (!proxy_allowed(&the_worker->engine->net, src_addr)) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
kr_log_debug(IO, "<= connection to '%s': PROXYv2 not allowed "
|
|
"for this peer, close\n",
|
|
kr_straddr(src_addr));
|
|
}
|
|
worker_end_tcp(s);
|
|
return;
|
|
}
|
|
|
|
struct proxy_result *proxy = session_proxy_create(s);
|
|
ssize_t trimmed = proxy_process_header(proxy, s, data, data_len);
|
|
if (trimmed < 0) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
if (trimmed == KNOT_EMALF) {
|
|
kr_log_debug(IO, "<= connection to '%s': "
|
|
"malformed PROXYv2 header, close\n",
|
|
kr_straddr(src_addr));
|
|
} else {
|
|
kr_log_debug(IO, "<= connection to '%s': "
|
|
"error processing PROXYv2 header, close\n",
|
|
kr_straddr(src_addr));
|
|
}
|
|
}
|
|
worker_end_tcp(s);
|
|
return;
|
|
} else if (trimmed == 0) {
|
|
return;
|
|
}
|
|
|
|
if (proxy->command != PROXY2_CMD_LOCAL && proxy->family != AF_UNSPEC) {
|
|
src_addr = &proxy->src_addr.ip;
|
|
dst_addr = &proxy->dst_addr.ip;
|
|
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
kr_log_debug(IO, "<= TCP stream from '%s'\n",
|
|
kr_straddr(src_addr));
|
|
kr_log_debug(IO, "<= proxied through '%s'\n",
|
|
kr_straddr(session_get_peer(s)));
|
|
}
|
|
}
|
|
|
|
data = session_wirebuf_get_free_start(s);
|
|
data_len = nread - trimmed;
|
|
}
|
|
|
|
session_flags(s)->no_proxy = true;
|
|
|
|
ssize_t consumed = 0;
|
|
if (session_flags(s)->has_tls) {
|
|
/* buf->base points to start of the tls receive buffer.
|
|
Decode data free space in session wire buffer. */
|
|
consumed = tls_process_input_data(s, data, data_len);
|
|
if (consumed < 0) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
char *peer_str = kr_straddr(src_addr);
|
|
kr_log_debug(IO, "=> connection to '%s': "
|
|
"error processing TLS data, close\n",
|
|
peer_str ? peer_str : "");
|
|
}
|
|
worker_end_tcp(s);
|
|
return;
|
|
} else if (consumed == 0) {
|
|
return;
|
|
}
|
|
data = session_wirebuf_get_free_start(s);
|
|
data_len = consumed;
|
|
}
|
|
#if ENABLE_DOH2
|
|
int streaming = 1;
|
|
if (session_flags(s)->has_http) {
|
|
streaming = http_process_input_data(s, data, data_len,
|
|
&consumed);
|
|
if (streaming < 0) {
|
|
if (kr_log_is_debug(IO, NULL)) {
|
|
char *peer_str = kr_straddr(src_addr);
|
|
kr_log_debug(IO, "=> connection to '%s': "
|
|
"error processing HTTP data, close\n",
|
|
peer_str ? peer_str : "");
|
|
}
|
|
worker_end_tcp(s);
|
|
return;
|
|
}
|
|
if (consumed == 0) {
|
|
return;
|
|
}
|
|
data = session_wirebuf_get_free_start(s);
|
|
data_len = consumed;
|
|
}
|
|
#endif
|
|
|
|
/* data points to start of the free space in session wire buffer.
|
|
Simple increase internal counter. */
|
|
consumed = session_wirebuf_consume(s, data, data_len);
|
|
kr_assert(consumed == data_len);
|
|
|
|
struct io_comm_data comm = {
|
|
.src_addr = src_addr,
|
|
.comm_addr = session_get_peer(s),
|
|
.dst_addr = dst_addr,
|
|
.proxy = session_proxy_get(s)
|
|
};
|
|
int ret = session_wirebuf_process(s, &comm);
|
|
if (ret < 0) {
|
|
/* An error has occurred, close the session. */
|
|
worker_end_tcp(s);
|
|
}
|
|
session_wirebuf_compress(s);
|
|
mp_flush(the_worker->pkt_pool.ctx);
|
|
#if ENABLE_DOH2
|
|
if (session_flags(s)->has_http && streaming == 0 && ret == 0) {
|
|
ret = http_send_status(s, HTTP_STATUS_BAD_REQUEST);
|
|
if (ret) {
|
|
/* An error has occurred, close the session. */
|
|
worker_end_tcp(s);
|
|
}
|
|
}
|
|
#endif
|
|
}
|
|
|
|
#if ENABLE_DOH2
|
|
static ssize_t tls_send(const uint8_t *buf, const size_t len, struct session *session)
|
|
{
|
|
struct tls_ctx *ctx = session_tls_get_server_ctx(session);
|
|
ssize_t sent = 0;
|
|
kr_require(ctx);
|
|
|
|
sent = gnutls_record_send(ctx->c.tls_session, buf, len);
|
|
if (sent < 0) {
|
|
kr_log_debug(DOH, "gnutls_record_send failed: %s (%zd)\n",
|
|
gnutls_strerror_name(sent), sent);
|
|
return kr_error(EIO);
|
|
}
|
|
return sent;
|
|
}
|
|
#endif
|
|
|
|
static void tcp_accept_internal(uv_stream_t *master, int status, bool tls, bool http)
|
|
{
|
|
if (status != 0) {
|
|
return;
|
|
}
|
|
|
|
struct worker_ctx *worker = the_worker;
|
|
uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
|
|
if (!client) {
|
|
return;
|
|
}
|
|
int res = io_create(master->loop, (uv_handle_t *)client,
|
|
SOCK_STREAM, AF_UNSPEC, tls, http);
|
|
if (res) {
|
|
if (res == UV_EMFILE) {
|
|
worker->too_many_open = true;
|
|
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
|
|
}
|
|
/* Since res isn't OK struct session wasn't allocated \ borrowed.
|
|
* We must release client handle only.
|
|
*/
|
|
free(client);
|
|
return;
|
|
}
|
|
|
|
/* struct session was allocated \ borrowed from memory pool. */
|
|
struct session *s = client->data;
|
|
kr_require(session_flags(s)->outgoing == false);
|
|
kr_require(session_flags(s)->has_tls == tls);
|
|
|
|
if (uv_accept(master, (uv_stream_t *)client) != 0) {
|
|
/* close session, close underlying uv handles and
|
|
* deallocate (or return to memory pool) memory. */
|
|
session_close(s);
|
|
return;
|
|
}
|
|
|
|
/* Get peer's and our address. We apparently get specific sockname here
|
|
* even if we listened on a wildcard address. */
|
|
struct sockaddr *sa = session_get_peer(s);
|
|
int sa_len = sizeof(struct sockaddr_in6);
|
|
int ret = uv_tcp_getpeername(client, sa, &sa_len);
|
|
if (ret || sa->sa_family == AF_UNSPEC) {
|
|
session_close(s);
|
|
return;
|
|
}
|
|
sa = session_get_sockname(s);
|
|
sa_len = sizeof(struct sockaddr_in6);
|
|
ret = uv_tcp_getsockname(client, sa, &sa_len);
|
|
if (ret || sa->sa_family == AF_UNSPEC) {
|
|
session_close(s);
|
|
return;
|
|
}
|
|
|
|
/* Set deadlines for TCP connection and start reading.
|
|
* It will re-check every half of a request time limit if the connection
|
|
* is idle and should be terminated, this is an educated guess. */
|
|
|
|
const struct network *net = &worker->engine->net;
|
|
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
|
|
|
|
uint64_t timeout = KR_CONN_RTT_MAX / 2;
|
|
if (tls) {
|
|
timeout += TLS_MAX_HANDSHAKE_TIME;
|
|
struct tls_ctx *ctx = session_tls_get_server_ctx(s);
|
|
if (!ctx) {
|
|
ctx = tls_new(worker);
|
|
if (!ctx) {
|
|
session_close(s);
|
|
return;
|
|
}
|
|
ctx->c.session = s;
|
|
ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
|
|
|
|
/* Configure ALPN. */
|
|
gnutls_datum_t proto;
|
|
if (!http) {
|
|
proto.data = (unsigned char *)"dot";
|
|
proto.size = 3;
|
|
} else {
|
|
proto.data = (unsigned char *)"h2";
|
|
proto.size = 2;
|
|
}
|
|
unsigned int flags = 0;
|
|
#if GNUTLS_VERSION_NUMBER >= 0x030500
|
|
/* Mandatory ALPN means the protocol must match if and
|
|
* only if ALPN extension is used by the client. */
|
|
flags |= GNUTLS_ALPN_MANDATORY;
|
|
#endif
|
|
ret = gnutls_alpn_set_protocols(ctx->c.tls_session, &proto, 1, flags);
|
|
if (ret != GNUTLS_E_SUCCESS) {
|
|
session_close(s);
|
|
return;
|
|
}
|
|
|
|
session_tls_set_server_ctx(s, ctx);
|
|
}
|
|
}
|
|
#if ENABLE_DOH2
|
|
if (http) {
|
|
struct http_ctx *ctx = session_http_get_server_ctx(s);
|
|
if (!ctx) {
|
|
if (!tls) { /* Plain HTTP is not supported. */
|
|
session_close(s);
|
|
return;
|
|
}
|
|
ctx = http_new(s, tls_send);
|
|
if (!ctx) {
|
|
session_close(s);
|
|
return;
|
|
}
|
|
session_http_set_server_ctx(s, ctx);
|
|
}
|
|
}
|
|
#endif
|
|
session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
|
|
io_start_read((uv_handle_t *)client);
|
|
}
|
|
|
|
static void tcp_accept(uv_stream_t *master, int status)
|
|
{
|
|
tcp_accept_internal(master, status, false, false);
|
|
}
|
|
|
|
static void tls_accept(uv_stream_t *master, int status)
|
|
{
|
|
tcp_accept_internal(master, status, true, false);
|
|
}
|
|
|
|
#if ENABLE_DOH2
|
|
static void https_accept(uv_stream_t *master, int status)
|
|
{
|
|
tcp_accept_internal(master, status, true, true);
|
|
}
|
|
#endif
|
|
|
|
int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls, bool has_http)
|
|
{
|
|
uv_connection_cb connection;
|
|
|
|
if (!handle) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
int ret = uv_tcp_init(loop, handle);
|
|
if (ret) return ret;
|
|
|
|
if (has_tls && has_http) {
|
|
#if ENABLE_DOH2
|
|
connection = https_accept;
|
|
#else
|
|
kr_log_error(IO, "kresd was compiled without libnghttp2 support\n");
|
|
return kr_error(ENOPROTOOPT);
|
|
#endif
|
|
} else if (has_tls) {
|
|
connection = tls_accept;
|
|
} else if (has_http) {
|
|
return kr_error(EPROTONOSUPPORT);
|
|
} else {
|
|
connection = tcp_accept;
|
|
}
|
|
|
|
ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
|
|
if (ret) return ret;
|
|
|
|
int val; (void)val;
|
|
/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
|
|
#ifdef TCP_DEFER_ACCEPT
|
|
val = KR_CONN_RTT_MAX/1000;
|
|
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val))) {
|
|
kr_log_error(IO, "listen TCP (defer_accept): %s\n", strerror(errno));
|
|
}
|
|
#endif
|
|
|
|
ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
|
|
if (ret != 0) {
|
|
return ret;
|
|
}
|
|
|
|
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
|
|
#ifdef TCP_FASTOPEN
|
|
#ifdef __linux__
|
|
val = 16; /* Accepts queue length hint */
|
|
#else
|
|
val = 1; /* Accepts on/off */
|
|
#endif
|
|
if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &val, sizeof(val))) {
|
|
kr_log_error(IO, "listen TCP (fastopen): %s%s\n", strerror(errno),
|
|
(errno != EPERM ? "" :
|
|
". This may be caused by TCP Fast Open being disabled in the OS."));
|
|
}
|
|
#endif
|
|
|
|
/* These get inherited into the individual connections (on Linux at least). */
|
|
struct network *net = &the_worker->engine->net;
|
|
set_so(fd, SO_SNDBUF, net->listen_tcp_buflens.snd, "TCP send buffer size");
|
|
set_so(fd, SO_RCVBUF, net->listen_tcp_buflens.rcv, "TCP receive buffer size");
|
|
#ifdef TCP_USER_TIMEOUT
|
|
val = net->tcp.user_timeout;
|
|
if (val && setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof(val))) {
|
|
kr_log_error(IO, "listen TCP (user_timeout): %s\n", strerror(errno));
|
|
}
|
|
// TODO: also for upstream connections, at least this one option?
|
|
#endif
|
|
|
|
handle->data = NULL;
|
|
return 0;
|
|
}
|
|
|
|
|
|
enum io_stream_mode {
|
|
io_mode_text = 0,
|
|
io_mode_binary = 1,
|
|
};
|
|
|
|
struct io_stream_data {
|
|
enum io_stream_mode mode;
|
|
size_t blen; ///< length of `buf`
|
|
char *buf; ///< growing buffer residing on `pool` (mp_append_*)
|
|
knot_mm_t *pool;
|
|
};
|
|
|
|
/**
|
|
* TTY control: process input and free() the buffer.
|
|
*
|
|
* For parameters see http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb
|
|
*
|
|
* - This is just basic read-eval-print; libedit is supported through kresc;
|
|
*/
|
|
void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
|
|
{
|
|
auto_free char *commands = buf ? buf->base : NULL;
|
|
|
|
/* Set output streams */
|
|
FILE *out = stdout;
|
|
uv_os_fd_t stream_fd = -1;
|
|
struct args *args = the_args;
|
|
struct io_stream_data *data = (struct io_stream_data*) stream->data;
|
|
if (nread < 0 || uv_fileno((uv_handle_t *)stream, &stream_fd)) {
|
|
mp_delete(data->pool->ctx);
|
|
uv_close((uv_handle_t *)stream, (uv_close_cb) free);
|
|
return;
|
|
}
|
|
if (nread <= 0) {
|
|
return;
|
|
}
|
|
if (stream_fd != STDIN_FILENO) {
|
|
uv_os_fd_t dup_fd = dup(stream_fd);
|
|
if (dup_fd >= 0) {
|
|
out = fdopen(dup_fd, "w");
|
|
}
|
|
}
|
|
|
|
/** The current single command and the remaining command(s). */
|
|
char *cmd, *cmd_next = NULL;
|
|
bool incomplete_cmd = false;
|
|
|
|
if (!commands || nread <= 0) {
|
|
goto finish;
|
|
}
|
|
|
|
/* Execute */
|
|
if (commands[nread - 1] != '\n') {
|
|
incomplete_cmd = true;
|
|
}
|
|
/* Ensure commands is 0-terminated */
|
|
if (nread >= buf->len) { /* only equality should be possible */
|
|
char *newbuf = realloc(commands, nread + 1);
|
|
if (!newbuf)
|
|
goto finish;
|
|
commands = newbuf;
|
|
}
|
|
commands[nread] = '\0';
|
|
|
|
char *boundary = "\n\0";
|
|
cmd = strtok(commands, "\n");
|
|
/* strtok skip '\n' but we need process alone '\n' too */
|
|
if (commands[0] == '\n') {
|
|
cmd_next = cmd;
|
|
cmd = boundary;
|
|
} else {
|
|
cmd_next = strtok(NULL, "\n");
|
|
}
|
|
|
|
/** Moving pointer to end of buffer with incomplete command. */
|
|
char *pbuf = data->buf + data->blen;
|
|
lua_State *L = the_worker->engine->L;
|
|
while (cmd != NULL) {
|
|
/* Last command is incomplete - save it and execute later */
|
|
if (incomplete_cmd && cmd_next == NULL) {
|
|
pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
|
|
mp_append_char(data->pool->ctx, pbuf, '\0');
|
|
data->buf = mp_ptr(data->pool->ctx);
|
|
data->blen = data->blen + strlen(cmd);
|
|
|
|
/* There is new incomplete command */
|
|
if (commands[nread - 1] == '\n')
|
|
incomplete_cmd = false;
|
|
goto next_iter;
|
|
}
|
|
|
|
/* Process incomplete command from previously call */
|
|
if (data->blen > 0) {
|
|
if (commands[0] != '\n' && commands[0] != '\0') {
|
|
pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
|
|
mp_append_char(data->pool->ctx, pbuf, '\0');
|
|
data->buf = mp_ptr(data->pool->ctx);
|
|
cmd = data->buf;
|
|
} else {
|
|
cmd = data->buf;
|
|
}
|
|
data->blen = 0;
|
|
pbuf = data->buf;
|
|
}
|
|
|
|
/* Pseudo-command for switching to "binary output"; */
|
|
if (strcmp(cmd, "__binary") == 0) {
|
|
data->mode = io_mode_binary;
|
|
goto next_iter;
|
|
}
|
|
|
|
const bool cmd_failed = engine_cmd(L, cmd, false);
|
|
const char *message = NULL;
|
|
size_t len_s;
|
|
if (lua_gettop(L) > 0) {
|
|
message = lua_tolstring(L, -1, &len_s);
|
|
}
|
|
|
|
/* Send back the output, either in "binary" or normal mode. */
|
|
if (data->mode == io_mode_binary) {
|
|
/* Leader expects length field in all cases */
|
|
if (!message || len_s > UINT32_MAX) {
|
|
kr_log_error(IO, "unrepresentable response on control socket, "
|
|
"sending back empty block (command '%s')\n", cmd);
|
|
len_s = 0;
|
|
}
|
|
uint32_t len_n = htonl(len_s);
|
|
if (fwrite(&len_n, sizeof(len_n), 1, out) != 1)
|
|
goto finish;
|
|
if (len_s > 0) {
|
|
if (fwrite(message, len_s, 1, out) != 1)
|
|
goto finish;
|
|
}
|
|
} else {
|
|
if (message) {
|
|
if (fprintf(out, "%s", message) < 0)
|
|
goto finish;
|
|
}
|
|
if (message || !args->quiet) {
|
|
if (fprintf(out, "\n") < 0)
|
|
goto finish;
|
|
}
|
|
if (!args->quiet) {
|
|
if (fprintf(out, "> ") < 0)
|
|
goto finish;
|
|
}
|
|
}
|
|
|
|
/* Duplicate command and output to logs */
|
|
if (cmd_failed) {
|
|
kr_log_warning(CONTROL, "> %s\n", cmd);
|
|
if (message)
|
|
kr_log_warning(CONTROL, "%s\n", message);
|
|
} else {
|
|
kr_log_debug(CONTROL, "> %s\n", cmd);
|
|
if (message)
|
|
kr_log_debug(CONTROL, "%s\n", message);
|
|
}
|
|
next_iter:
|
|
lua_settop(L, 0); /* not required in some cases but harmless */
|
|
cmd = cmd_next;
|
|
cmd_next = strtok(NULL, "\n");
|
|
}
|
|
|
|
finish:
|
|
/* Close if redirected */
|
|
if (stream_fd != STDIN_FILENO) {
|
|
(void)fclose(out);
|
|
}
|
|
}
|
|
|
|
void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
|
|
{
|
|
buf->len = suggested;
|
|
buf->base = malloc(suggested);
|
|
}
|
|
|
|
struct io_stream_data *io_tty_alloc_data(void) {
|
|
knot_mm_t *pool = mm_ctx_mempool2(MM_DEFAULT_BLKSIZE);
|
|
if (!pool) {
|
|
return NULL;
|
|
}
|
|
struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data));
|
|
|
|
data->buf = mp_start(pool->ctx, 512);
|
|
data->mode = io_mode_text;
|
|
data->blen = 0;
|
|
data->pool = pool;
|
|
|
|
return data;
|
|
}
|
|
|
|
void io_tty_accept(uv_stream_t *master, int status)
|
|
{
|
|
/* We can't use any allocations after mp_start() and it's easier anyway. */
|
|
uv_pipe_t *client = malloc(sizeof(*client));
|
|
if (!client)
|
|
return;
|
|
|
|
struct io_stream_data *data = io_tty_alloc_data();
|
|
if (!data) {
|
|
free(client);
|
|
return;
|
|
}
|
|
client->data = data;
|
|
|
|
struct args *args = the_args;
|
|
uv_pipe_init(master->loop, client, 0);
|
|
if (uv_accept(master, (uv_stream_t *)client) != 0) {
|
|
mp_delete(data->pool->ctx);
|
|
return;
|
|
}
|
|
uv_read_start((uv_stream_t *)client, io_tty_alloc, io_tty_process_input);
|
|
|
|
/* Write command line */
|
|
if (!args->quiet) {
|
|
uv_buf_t buf = { "> ", 2 };
|
|
uv_try_write((uv_stream_t *)client, &buf, 1);
|
|
}
|
|
}
|
|
|
|
int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd)
|
|
{
|
|
if (!handle) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
int ret = uv_pipe_init(loop, handle, 0);
|
|
if (ret) return ret;
|
|
|
|
ret = uv_pipe_open(handle, fd);
|
|
if (ret) return ret;
|
|
|
|
ret = uv_listen((uv_stream_t *)handle, 16, io_tty_accept);
|
|
if (ret) return ret;
|
|
|
|
handle->data = NULL;
|
|
|
|
return 0;
|
|
}
|
|
|
|
#if ENABLE_XDP
|
|
static void xdp_rx(uv_poll_t* handle, int status, int events)
|
|
{
|
|
const int XDP_RX_BATCH_SIZE = 64;
|
|
if (status < 0) {
|
|
kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
|
|
return;
|
|
}
|
|
if (events != UV_READABLE) {
|
|
kr_log_error(XDP, "poll unexpected events: %d\n", events);
|
|
return;
|
|
}
|
|
|
|
xdp_handle_data_t *xhd = handle->data;
|
|
kr_require(xhd && xhd->session && xhd->socket);
|
|
uint32_t rcvd;
|
|
knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
|
|
int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd, NULL);
|
|
|
|
if (kr_fails_assert(ret == KNOT_EOK)) {
|
|
/* ATM other error codes can only be returned when called incorrectly */
|
|
kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
|
|
return;
|
|
}
|
|
kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
|
|
kr_require(rcvd <= XDP_RX_BATCH_SIZE);
|
|
for (int i = 0; i < rcvd; ++i) {
|
|
const knot_xdp_msg_t *msg = &msgs[i];
|
|
kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
|
|
knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
|
|
&the_worker->pkt_pool);
|
|
if (kpkt == NULL) {
|
|
ret = kr_error(ENOMEM);
|
|
} else {
|
|
struct io_comm_data comm = {
|
|
.src_addr = (const struct sockaddr *)&msg->ip_from,
|
|
.comm_addr = (const struct sockaddr *)&msg->ip_from,
|
|
.dst_addr = (const struct sockaddr *)&msg->ip_to
|
|
};
|
|
ret = worker_submit(xhd->session, &comm,
|
|
msg->eth_from, msg->eth_to, kpkt);
|
|
}
|
|
if (ret)
|
|
kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
|
|
mp_flush(the_worker->pkt_pool.ctx);
|
|
}
|
|
knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
|
|
}
|
|
/// Warn if the XDP program is running in emulated mode (XDP_SKB)
|
|
static void xdp_warn_mode(const char *ifname)
|
|
{
|
|
if (kr_fails_assert(ifname))
|
|
return;
|
|
|
|
const unsigned if_index = if_nametoindex(ifname);
|
|
if (!if_index) {
|
|
kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
|
|
ifname, strerror(errno));
|
|
return;
|
|
}
|
|
|
|
const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
|
|
switch (mode) {
|
|
case KNOT_XDP_MODE_FULL:
|
|
return;
|
|
case KNOT_XDP_MODE_EMUL:
|
|
kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
|
|
ifname);
|
|
return;
|
|
case KNOT_XDP_MODE_NONE: // enum warnings from compiler
|
|
break;
|
|
}
|
|
kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
|
|
ifname, (int)mode);
|
|
}
|
|
int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
|
|
{
|
|
if (!ep || !ep->handle) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
// RLIMIT_MEMLOCK often needs raising when operating on BPF
|
|
static int ret_limit = 1;
|
|
if (ret_limit == 1) {
|
|
struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
|
|
ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
|
|
? kr_error(errno) : 0;
|
|
}
|
|
if (ret_limit) return ret_limit;
|
|
|
|
xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
|
|
if (!xhd) return kr_error(ENOMEM);
|
|
|
|
xhd->socket = NULL; // needed for some reason
|
|
|
|
// This call is a libknot version hell, unfortunately.
|
|
int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
|
|
#if KNOT_VERSION_HEX < 0x030200
|
|
ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
|
|
KNOT_XDP_LOAD_BPF_MAYBE
|
|
#else
|
|
KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
|
|
ep->port, 0/*quic_port*/,
|
|
KNOT_XDP_LOAD_BPF_MAYBE,
|
|
NULL/*xdp_config*/
|
|
#endif
|
|
);
|
|
|
|
if (!ret) xdp_warn_mode(ifname);
|
|
|
|
if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
|
|
if (ret || kr_fails_assert(xhd->socket)) {
|
|
free(xhd);
|
|
return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
|
|
}
|
|
xhd->tx_waker.data = xhd->socket;
|
|
|
|
ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
|
|
ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
|
|
if (ret) {
|
|
knot_xdp_deinit(xhd->socket);
|
|
free(xhd);
|
|
return kr_error(ret);
|
|
}
|
|
|
|
// beware: this sets poll_handle->data
|
|
xhd->session = session_new(ep->handle, false, false);
|
|
kr_require(!session_flags(xhd->session)->outgoing);
|
|
session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
|
|
|
|
ep->handle->data = xhd;
|
|
ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
|
|
return ret;
|
|
}
|
|
#endif
|
|
|
|
|
|
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http)
|
|
{
|
|
int ret = -1;
|
|
if (type == SOCK_DGRAM) {
|
|
ret = uv_udp_init(loop, (uv_udp_t *)handle);
|
|
} else if (type == SOCK_STREAM) {
|
|
ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
|
|
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
|
|
}
|
|
if (ret != 0) {
|
|
return ret;
|
|
}
|
|
struct session *s = session_new(handle, has_tls, has_http);
|
|
if (s == NULL) {
|
|
ret = -1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static void io_deinit(uv_handle_t *handle)
|
|
{
|
|
if (!handle || !handle->data) {
|
|
return;
|
|
}
|
|
if (handle->type != UV_POLL) {
|
|
session_free(handle->data);
|
|
} else {
|
|
#if ENABLE_XDP
|
|
xdp_handle_data_t *xhd = handle->data;
|
|
uv_idle_stop(&xhd->tx_waker);
|
|
uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
|
|
session_free(xhd->session);
|
|
knot_xdp_deinit(xhd->socket);
|
|
free(xhd);
|
|
#else
|
|
kr_assert(false);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
void io_free(uv_handle_t *handle)
|
|
{
|
|
io_deinit(handle);
|
|
free(handle);
|
|
}
|
|
|
|
int io_start_read(uv_handle_t *handle)
|
|
{
|
|
switch (handle->type) {
|
|
case UV_UDP:
|
|
return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
|
|
case UV_TCP:
|
|
return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
|
|
default:
|
|
kr_assert(false);
|
|
return kr_error(EINVAL);
|
|
}
|
|
}
|
|
|
|
int io_stop_read(uv_handle_t *handle)
|
|
{
|
|
if (handle->type == UV_UDP) {
|
|
return uv_udp_recv_stop((uv_udp_t *)handle);
|
|
} else {
|
|
return uv_read_stop((uv_stream_t *)handle);
|
|
}
|
|
}
|