2245 lines
66 KiB
C
2245 lines
66 KiB
C
/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
|
|
* SPDX-License-Identifier: GPL-3.0-or-later
|
|
*/
|
|
|
|
#include "kresconfig.h"
|
|
#include "daemon/worker.h"
|
|
|
|
#include <uv.h>
|
|
#include <lua.h>
|
|
#include <lauxlib.h>
|
|
#include <libknot/packet/pkt.h>
|
|
#include <libknot/descriptor.h>
|
|
#include <contrib/cleanup.h>
|
|
#include <contrib/ucw/lib.h>
|
|
#include <contrib/ucw/mempool.h>
|
|
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
|
|
#include <malloc.h>
|
|
#endif
|
|
#include <sys/types.h>
|
|
#include <unistd.h>
|
|
#include <gnutls/gnutls.h>
|
|
|
|
#if ENABLE_XDP
|
|
#include <libknot/xdp/xdp.h>
|
|
#endif
|
|
|
|
#include "daemon/bindings/api.h"
|
|
#include "daemon/engine.h"
|
|
#include "daemon/io.h"
|
|
#include "daemon/proxyv2.h"
|
|
#include "daemon/session.h"
|
|
#include "daemon/tls.h"
|
|
#include "daemon/http.h"
|
|
#include "daemon/udp_queue.h"
|
|
#include "lib/layer.h"
|
|
#include "lib/utils.h"
|
|
|
|
|
|
/* Magic defaults for the worker. */
|
|
#ifndef MAX_PIPELINED
|
|
#define MAX_PIPELINED 100
|
|
#endif
|
|
|
|
#define VERBOSE_MSG(qry, ...) kr_log_q(qry, WORKER, __VA_ARGS__)
|
|
|
|
/** Client request state. */
|
|
struct request_ctx
|
|
{
|
|
struct kr_request req;
|
|
|
|
struct worker_ctx *worker;
|
|
struct qr_task *task;
|
|
struct {
|
|
/** NULL if the request didn't come over network. */
|
|
struct session *session;
|
|
/** Requestor's address; separate because of UDP session "sharing". */
|
|
union kr_sockaddr addr;
|
|
/** Request communication address; if not from a proxy, same as addr. */
|
|
union kr_sockaddr comm_addr;
|
|
/** Local address. For AF_XDP we couldn't use session's,
|
|
* as the address might be different every time. */
|
|
union kr_sockaddr dst_addr;
|
|
/** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */
|
|
uint8_t eth_addrs[2][6];
|
|
} source;
|
|
};
|
|
|
|
/** Query resolution task. */
|
|
struct qr_task
|
|
{
|
|
struct request_ctx *ctx;
|
|
knot_pkt_t *pktbuf;
|
|
qr_tasklist_t waiting;
|
|
struct session *pending[MAX_PENDING];
|
|
uint16_t pending_count;
|
|
uint16_t timeouts;
|
|
uint16_t iter_count;
|
|
uint32_t refs;
|
|
bool finished : 1;
|
|
bool leading : 1;
|
|
uint64_t creation_time;
|
|
uint64_t send_time;
|
|
uint64_t recv_time;
|
|
struct kr_transport *transport;
|
|
};
|
|
|
|
|
|
/* Convenience macros */
|
|
#define qr_task_ref(task) \
|
|
do { ++(task)->refs; } while(0)
|
|
#define qr_task_unref(task) \
|
|
do { \
|
|
if (task) \
|
|
kr_require((task)->refs > 0); \
|
|
if ((task) && --(task)->refs == 0) \
|
|
qr_task_free((task)); \
|
|
} while (0)
|
|
|
|
/* Forward decls */
|
|
static void qr_task_free(struct qr_task *task);
|
|
static int qr_task_step(struct qr_task *task,
|
|
const struct sockaddr *packet_source,
|
|
knot_pkt_t *packet);
|
|
static int qr_task_send(struct qr_task *task, struct session *session,
|
|
const struct sockaddr *addr, knot_pkt_t *pkt);
|
|
static int qr_task_finalize(struct qr_task *task, int state);
|
|
static void qr_task_complete(struct qr_task *task);
|
|
struct session* worker_find_tcp_connected(struct worker_ctx *worker,
|
|
const struct sockaddr *addr);
|
|
static int worker_add_tcp_waiting(struct worker_ctx *worker,
|
|
const struct sockaddr *addr,
|
|
struct session *session);
|
|
struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
|
|
const struct sockaddr *addr);
|
|
static void on_tcp_connect_timeout(uv_timer_t *timer);
|
|
static void on_udp_timeout(uv_timer_t *timer);
|
|
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
|
|
|
|
|
|
struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
|
|
struct worker_ctx *the_worker = NULL;
|
|
|
|
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
|
|
* socktype is SOCK_* */
|
|
static uv_handle_t *ioreq_spawn(struct worker_ctx *worker,
|
|
int socktype, sa_family_t family, bool has_tls,
|
|
bool has_http)
|
|
{
|
|
bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
|
|
&& (family == AF_INET || family == AF_INET6);
|
|
if (kr_fails_assert(precond)) {
|
|
kr_log_debug(WORKER, "ioreq_spawn: pre-condition failed\n");
|
|
return NULL;
|
|
}
|
|
|
|
/* Create connection for iterative query */
|
|
uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
|
|
? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
|
|
if (!handle) {
|
|
return NULL;
|
|
}
|
|
int ret = io_create(worker->loop, handle, socktype, family, has_tls, has_http);
|
|
if (ret) {
|
|
if (ret == UV_EMFILE) {
|
|
worker->too_many_open = true;
|
|
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
|
|
}
|
|
free(handle);
|
|
return NULL;
|
|
}
|
|
|
|
/* Bind to outgoing address, according to IP v4/v6. */
|
|
union kr_sockaddr *addr;
|
|
if (family == AF_INET) {
|
|
addr = (union kr_sockaddr *)&worker->out_addr4;
|
|
} else {
|
|
addr = (union kr_sockaddr *)&worker->out_addr6;
|
|
}
|
|
if (addr->ip.sa_family != AF_UNSPEC) {
|
|
if (kr_fails_assert(addr->ip.sa_family == family)) {
|
|
io_free(handle);
|
|
return NULL;
|
|
}
|
|
if (socktype == SOCK_DGRAM) {
|
|
uv_udp_t *udp = (uv_udp_t *)handle;
|
|
ret = uv_udp_bind(udp, &addr->ip, 0);
|
|
} else if (socktype == SOCK_STREAM){
|
|
uv_tcp_t *tcp = (uv_tcp_t *)handle;
|
|
ret = uv_tcp_bind(tcp, &addr->ip, 0);
|
|
}
|
|
}
|
|
|
|
if (ret != 0) {
|
|
io_free(handle);
|
|
return NULL;
|
|
}
|
|
|
|
/* Set current handle as a subrequest type. */
|
|
struct session *session = handle->data;
|
|
session_flags(session)->outgoing = true;
|
|
/* Connect or issue query datagram */
|
|
return handle;
|
|
}
|
|
|
|
static void ioreq_kill_pending(struct qr_task *task)
|
|
{
|
|
for (uint16_t i = 0; i < task->pending_count; ++i) {
|
|
session_kill_ioreq(task->pending[i], task);
|
|
}
|
|
task->pending_count = 0;
|
|
}
|
|
|
|
/** Get a mempool. */
|
|
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
|
|
{
|
|
/* The implementation used to have extra caching layer,
|
|
* but it didn't work well. Now it's very simple. */
|
|
return mp_new((size_t)16 * 1024);
|
|
}
|
|
/** Return a mempool. */
|
|
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
|
|
{
|
|
mp_delete(mp);
|
|
}
|
|
|
|
/** Create a key for an outgoing subrequest: qname, qclass, qtype.
|
|
* @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
|
|
* @return key length if successful or an error
|
|
*/
|
|
static const size_t SUBREQ_KEY_LEN = KR_RRKEY_LEN;
|
|
static int subreq_key(char *dst, knot_pkt_t *pkt)
|
|
{
|
|
kr_require(pkt);
|
|
return kr_rrkey(dst, knot_pkt_qclass(pkt), knot_pkt_qname(pkt),
|
|
knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
|
|
}
|
|
|
|
#if ENABLE_XDP
|
|
static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen)
|
|
{
|
|
if (kr_fails_assert(maxlen))
|
|
return NULL;
|
|
struct request_ctx *ctx = (struct request_ctx *)req;
|
|
/* We know it's an AF_XDP socket; otherwise this CB isn't assigned. */
|
|
uv_handle_t *handle = session_get_handle(ctx->source.session);
|
|
if (kr_fails_assert(handle->type == UV_POLL))
|
|
return NULL;
|
|
xdp_handle_data_t *xhd = handle->data;
|
|
knot_xdp_msg_t out;
|
|
bool ipv6 = ctx->source.comm_addr.ip.sa_family == AF_INET6;
|
|
int ret = knot_xdp_send_alloc(xhd->socket, ipv6 ? KNOT_XDP_MSG_IPV6 : 0, &out);
|
|
if (ret != KNOT_EOK) {
|
|
kr_assert(ret == KNOT_ENOMEM);
|
|
*maxlen = 0;
|
|
return NULL;
|
|
}
|
|
*maxlen = MIN(*maxlen, out.payload.iov_len);
|
|
return out.payload.iov_base;
|
|
}
|
|
static void free_wire(const struct request_ctx *ctx)
|
|
{
|
|
if (kr_fails_assert(ctx->req.alloc_wire_cb == alloc_wire_cb))
|
|
return;
|
|
knot_pkt_t *ans = ctx->req.answer;
|
|
if (unlikely(ans == NULL)) /* dropped */
|
|
return;
|
|
if (likely(ans->wire == NULL)) /* sent most likely */
|
|
return;
|
|
/* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */
|
|
uv_handle_t *handle = session_get_handle(ctx->source.session);
|
|
if (kr_fails_assert(handle->type == UV_POLL))
|
|
return;
|
|
xdp_handle_data_t *xhd = handle->data;
|
|
/* Freeing is done by sending an empty packet (the API won't really send it). */
|
|
knot_xdp_msg_t out;
|
|
out.payload.iov_base = ans->wire;
|
|
out.payload.iov_len = 0;
|
|
uint32_t sent = 0;
|
|
#if KNOT_VERSION_HEX >= 0x030100
|
|
int ret = 0;
|
|
knot_xdp_send_free(xhd->socket, &out, 1);
|
|
#else
|
|
int ret = knot_xdp_send(xhd->socket, &out, 1, &sent);
|
|
#endif
|
|
kr_assert(ret == KNOT_EOK && sent == 0);
|
|
kr_log_debug(XDP, "freed unsent buffer, ret = %d\n", ret);
|
|
}
|
|
#endif
|
|
/* Helper functions for transport selection */
|
|
static inline bool is_tls_capable(struct sockaddr *address) {
|
|
tls_client_param_t *tls_entry = tls_client_param_get(the_worker->engine->net.tls_client_params, address);
|
|
return tls_entry;
|
|
}
|
|
|
|
static inline bool is_tcp_connected(struct sockaddr *address) {
|
|
return worker_find_tcp_connected(the_worker, address);
|
|
}
|
|
|
|
static inline bool is_tcp_waiting(struct sockaddr *address) {
|
|
return worker_find_tcp_waiting(the_worker, address);
|
|
}
|
|
|
|
/** Create and initialize a request_ctx (on a fresh mempool).
|
|
*
|
|
* session and addr point to the source of the request, and they are NULL
|
|
* in case the request didn't come from network.
|
|
*/
|
|
static struct request_ctx *request_create(struct worker_ctx *worker,
|
|
struct session *session,
|
|
struct io_comm_data *comm,
|
|
const uint8_t *eth_from,
|
|
const uint8_t *eth_to,
|
|
uint32_t uid)
|
|
{
|
|
knot_mm_t pool = {
|
|
.ctx = pool_borrow(worker),
|
|
.alloc = (knot_mm_alloc_t) mp_alloc
|
|
};
|
|
|
|
/* Create request context */
|
|
struct request_ctx *ctx = mm_calloc(&pool, 1, sizeof(*ctx));
|
|
if (!ctx) {
|
|
pool_release(worker, pool.ctx);
|
|
return NULL;
|
|
}
|
|
|
|
/* TODO Relocate pool to struct request */
|
|
ctx->worker = worker;
|
|
if (session && kr_fails_assert(session_flags(session)->outgoing == false)) {
|
|
pool_release(worker, pool.ctx);
|
|
return NULL;
|
|
}
|
|
ctx->source.session = session;
|
|
if (kr_fails_assert(!!eth_to == !!eth_from)) {
|
|
pool_release(worker, pool.ctx);
|
|
return NULL;
|
|
}
|
|
const bool is_xdp = eth_to != NULL;
|
|
if (is_xdp) {
|
|
#if ENABLE_XDP
|
|
if (kr_fails_assert(session)) {
|
|
pool_release(worker, pool.ctx);
|
|
return NULL;
|
|
}
|
|
memcpy(&ctx->source.eth_addrs[0], eth_to, sizeof(ctx->source.eth_addrs[0]));
|
|
memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1]));
|
|
ctx->req.alloc_wire_cb = alloc_wire_cb;
|
|
#else
|
|
kr_assert(!EINVAL);
|
|
pool_release(worker, pool.ctx);
|
|
return NULL;
|
|
#endif
|
|
}
|
|
|
|
struct kr_request *req = &ctx->req;
|
|
req->pool = pool;
|
|
req->vars_ref = LUA_NOREF;
|
|
req->uid = uid;
|
|
req->qsource.comm_flags.xdp = is_xdp;
|
|
kr_request_set_extended_error(req, KNOT_EDNS_EDE_NONE, NULL);
|
|
array_init(req->qsource.headers);
|
|
if (session) {
|
|
kr_require(comm);
|
|
|
|
const struct sockaddr *src_addr = comm->src_addr;
|
|
const struct sockaddr *comm_addr = comm->comm_addr;
|
|
const struct sockaddr *dst_addr = comm->dst_addr;
|
|
const struct proxy_result *proxy = comm->proxy;
|
|
|
|
req->qsource.comm_flags.tcp = session_get_handle(session)->type == UV_TCP;
|
|
req->qsource.comm_flags.tls = session_flags(session)->has_tls;
|
|
req->qsource.comm_flags.http = session_flags(session)->has_http;
|
|
|
|
req->qsource.flags = req->qsource.comm_flags;
|
|
if (proxy) {
|
|
req->qsource.flags.tcp = proxy->protocol == SOCK_STREAM;
|
|
req->qsource.flags.tls = proxy->has_tls;
|
|
}
|
|
|
|
req->qsource.stream_id = -1;
|
|
#if ENABLE_DOH2
|
|
if (req->qsource.comm_flags.http) {
|
|
struct http_ctx *http_ctx = session_http_get_server_ctx(session);
|
|
struct http_stream stream = queue_head(http_ctx->streams);
|
|
req->qsource.stream_id = stream.id;
|
|
if (stream.headers) {
|
|
req->qsource.headers = *stream.headers;
|
|
free(stream.headers);
|
|
stream.headers = NULL;
|
|
}
|
|
}
|
|
#endif
|
|
/* We need to store a copy of peer address. */
|
|
memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr));
|
|
req->qsource.addr = &ctx->source.addr.ip;
|
|
|
|
if (!comm_addr)
|
|
comm_addr = src_addr;
|
|
memcpy(&ctx->source.comm_addr.ip, comm_addr, kr_sockaddr_len(comm_addr));
|
|
req->qsource.comm_addr = &ctx->source.comm_addr.ip;
|
|
|
|
if (!dst_addr) /* We wouldn't have to copy in this case, but for consistency. */
|
|
dst_addr = session_get_sockname(session);
|
|
memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr));
|
|
req->qsource.dst_addr = &ctx->source.dst_addr.ip;
|
|
}
|
|
|
|
req->selection_context.is_tls_capable = is_tls_capable;
|
|
req->selection_context.is_tcp_connected = is_tcp_connected;
|
|
req->selection_context.is_tcp_waiting = is_tcp_waiting;
|
|
array_init(req->selection_context.forwarding_targets);
|
|
array_reserve_mm(req->selection_context.forwarding_targets, 1, kr_memreserve, &req->pool);
|
|
|
|
worker->stats.rconcurrent += 1;
|
|
|
|
return ctx;
|
|
}
|
|
|
|
/** More initialization, related to the particular incoming query/packet. */
|
|
static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
|
|
{
|
|
if (kr_fails_assert(query && ctx))
|
|
return kr_error(EINVAL);
|
|
|
|
struct kr_request *req = &ctx->req;
|
|
req->qsource.size = query->size;
|
|
if (knot_pkt_has_tsig(query)) {
|
|
req->qsource.size += query->tsig_wire.len;
|
|
}
|
|
|
|
knot_pkt_t *pkt = knot_pkt_new(NULL, req->qsource.size, &req->pool);
|
|
if (!pkt) {
|
|
return kr_error(ENOMEM);
|
|
}
|
|
|
|
int ret = knot_pkt_copy(pkt, query);
|
|
if (ret != KNOT_EOK && ret != KNOT_ETRAIL) {
|
|
return kr_error(ENOMEM);
|
|
}
|
|
req->qsource.packet = pkt;
|
|
|
|
/* Start resolution */
|
|
struct worker_ctx *worker = ctx->worker;
|
|
struct engine *engine = worker->engine;
|
|
kr_resolve_begin(req, &engine->resolver);
|
|
worker->stats.queries += 1;
|
|
return kr_ok();
|
|
}
|
|
|
|
static void request_free(struct request_ctx *ctx)
|
|
{
|
|
struct worker_ctx *worker = ctx->worker;
|
|
/* Dereference any Lua vars table if exists */
|
|
if (ctx->req.vars_ref != LUA_NOREF) {
|
|
lua_State *L = worker->engine->L;
|
|
/* Get worker variables table */
|
|
lua_rawgeti(L, LUA_REGISTRYINDEX, worker->vars_table_ref);
|
|
/* Get next free element (position 0) and store it under current reference (forming a list) */
|
|
lua_rawgeti(L, -1, 0);
|
|
lua_rawseti(L, -2, ctx->req.vars_ref);
|
|
/* Set current reference as the next free element */
|
|
lua_pushinteger(L, ctx->req.vars_ref);
|
|
lua_rawseti(L, -2, 0);
|
|
lua_pop(L, 1);
|
|
ctx->req.vars_ref = LUA_NOREF;
|
|
}
|
|
/* Free HTTP/2 headers for DoH requests. */
|
|
for(int i = 0; i < ctx->req.qsource.headers.len; i++) {
|
|
free(ctx->req.qsource.headers.at[i].name);
|
|
free(ctx->req.qsource.headers.at[i].value);
|
|
}
|
|
array_clear(ctx->req.qsource.headers);
|
|
|
|
/* Make sure to free XDP buffer in case it wasn't sent. */
|
|
if (ctx->req.alloc_wire_cb) {
|
|
#if ENABLE_XDP
|
|
free_wire(ctx);
|
|
#else
|
|
kr_assert(!EINVAL);
|
|
#endif
|
|
}
|
|
/* Return mempool to ring or free it if it's full */
|
|
pool_release(worker, ctx->req.pool.ctx);
|
|
/* @note The 'task' is invalidated from now on. */
|
|
worker->stats.rconcurrent -= 1;
|
|
}
|
|
|
|
static struct qr_task *qr_task_create(struct request_ctx *ctx)
|
|
{
|
|
/* Choose (initial) pktbuf size. As it is now, pktbuf can be used
|
|
* for UDP answers from upstream *and* from cache
|
|
* and for sending queries upstream */
|
|
uint16_t pktbuf_max = KR_EDNS_PAYLOAD;
|
|
const knot_rrset_t *opt_our = ctx->worker->engine->resolver.upstream_opt_rr;
|
|
if (opt_our) {
|
|
pktbuf_max = MAX(pktbuf_max, knot_edns_get_payload(opt_our));
|
|
}
|
|
|
|
/* Create resolution task */
|
|
struct qr_task *task = mm_calloc(&ctx->req.pool, 1, sizeof(*task));
|
|
if (!task) {
|
|
return NULL;
|
|
}
|
|
|
|
/* Create packet buffers for answer and subrequests */
|
|
knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
|
|
if (!pktbuf) {
|
|
mm_free(&ctx->req.pool, task);
|
|
return NULL;
|
|
}
|
|
pktbuf->size = 0;
|
|
|
|
task->ctx = ctx;
|
|
task->pktbuf = pktbuf;
|
|
array_init(task->waiting);
|
|
task->refs = 0;
|
|
kr_assert(ctx->task == NULL);
|
|
ctx->task = task;
|
|
/* Make the primary reference to task. */
|
|
qr_task_ref(task);
|
|
task->creation_time = kr_now();
|
|
ctx->worker->stats.concurrent += 1;
|
|
return task;
|
|
}
|
|
|
|
/* This is called when the task refcount is zero, free memory. */
|
|
static void qr_task_free(struct qr_task *task)
|
|
{
|
|
struct request_ctx *ctx = task->ctx;
|
|
|
|
if (kr_fails_assert(ctx))
|
|
return;
|
|
|
|
struct worker_ctx *worker = ctx->worker;
|
|
|
|
if (ctx->task == NULL) {
|
|
request_free(ctx);
|
|
}
|
|
|
|
/* Update stats */
|
|
worker->stats.concurrent -= 1;
|
|
}
|
|
|
|
/*@ Register new qr_task within session. */
|
|
static int qr_task_register(struct qr_task *task, struct session *session)
|
|
{
|
|
if (kr_fails_assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP))
|
|
return kr_error(EINVAL);
|
|
|
|
session_tasklist_add(session, task);
|
|
|
|
struct request_ctx *ctx = task->ctx;
|
|
if (kr_fails_assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)))
|
|
return kr_error(EINVAL);
|
|
ctx->source.session = session;
|
|
/* Soft-limit on parallel queries, there is no "slow down" RCODE
|
|
* that we could use to signalize to client, but we can stop reading,
|
|
* an in effect shrink TCP window size. To get more precise throttling,
|
|
* we would need to copy remainder of the unread buffer and reassemble
|
|
* when resuming reading. This is NYI. */
|
|
if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max &&
|
|
!session_flags(session)->throttled && !session_flags(session)->closing) {
|
|
session_stop_read(session);
|
|
session_flags(session)->throttled = true;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void qr_task_complete(struct qr_task *task)
|
|
{
|
|
struct request_ctx *ctx = task->ctx;
|
|
|
|
/* Kill pending I/O requests */
|
|
ioreq_kill_pending(task);
|
|
kr_require(task->waiting.len == 0);
|
|
kr_require(task->leading == false);
|
|
|
|
struct session *s = ctx->source.session;
|
|
if (s) {
|
|
kr_require(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
|
|
ctx->source.session = NULL;
|
|
session_tasklist_del(s, task);
|
|
}
|
|
|
|
/* Release primary reference to task. */
|
|
if (ctx->task == task) {
|
|
ctx->task = NULL;
|
|
qr_task_unref(task);
|
|
}
|
|
}
|
|
|
|
/* This is called when we send subrequest / answer */
|
|
int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
|
|
{
|
|
if (task->finished) {
|
|
kr_require(task->leading == false);
|
|
qr_task_complete(task);
|
|
}
|
|
|
|
if (!handle || kr_fails_assert(handle->data))
|
|
return status;
|
|
struct session* s = handle->data;
|
|
|
|
if (handle->type == UV_UDP && session_flags(s)->outgoing) {
|
|
// This should ensure that we are only dealing with our question to upstream
|
|
if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
|
|
return status;
|
|
// start the timer
|
|
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
|
|
if (kr_fails_assert(qry && task->transport))
|
|
return status;
|
|
size_t timeout = task->transport->timeout;
|
|
int ret = session_timer_start(s, on_udp_timeout, timeout, 0);
|
|
/* Start next step with timeout, fatal if can't start a timer. */
|
|
if (ret != 0) {
|
|
subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
|
|
qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
}
|
|
|
|
if (handle->type == UV_TCP) {
|
|
if (status != 0) { // session probably not usable anymore; typically: ECONNRESET
|
|
const struct kr_request *req = &task->ctx->req;
|
|
if (kr_log_is_debug(WORKER, req)) {
|
|
const char *peer_str = NULL;
|
|
if (!session_flags(s)->outgoing) {
|
|
peer_str = "hidden"; // avoid logging downstream IPs
|
|
} else if (task->transport) {
|
|
peer_str = kr_straddr(&task->transport->address.ip);
|
|
}
|
|
if (!peer_str)
|
|
peer_str = "unknown"; // probably shouldn't happen
|
|
kr_log_req(req, 0, 0, WORKER,
|
|
"=> disconnected from '%s': %s\n",
|
|
peer_str, uv_strerror(status));
|
|
}
|
|
worker_end_tcp(s);
|
|
return status;
|
|
}
|
|
|
|
if (session_flags(s)->outgoing || session_flags(s)->closing)
|
|
return status;
|
|
|
|
struct worker_ctx *worker = task->ctx->worker;
|
|
if (session_flags(s)->throttled &&
|
|
session_tasklist_get_len(s) < worker->tcp_pipeline_max/2) {
|
|
/* Start reading again if the session is throttled and
|
|
* the number of outgoing requests is below watermark. */
|
|
session_start_read(s);
|
|
session_flags(s)->throttled = false;
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
static void on_write(uv_write_t *req, int status)
|
|
{
|
|
struct qr_task *task = req->data;
|
|
uv_handle_t *h = (uv_handle_t *)req->handle;
|
|
qr_task_on_send(task, h, status);
|
|
qr_task_unref(task);
|
|
free(req);
|
|
}
|
|
|
|
static int qr_task_send(struct qr_task *task, struct session *session,
|
|
const struct sockaddr *addr, knot_pkt_t *pkt)
|
|
{
|
|
if (!session)
|
|
return qr_task_on_send(task, NULL, kr_error(EIO));
|
|
|
|
int ret = 0;
|
|
struct request_ctx *ctx = task->ctx;
|
|
|
|
uv_handle_t *handle = session_get_handle(session);
|
|
if (kr_fails_assert(handle && handle->data == session))
|
|
return qr_task_on_send(task, NULL, kr_error(EINVAL));
|
|
const bool is_stream = handle->type == UV_TCP;
|
|
kr_require(is_stream || handle->type == UV_UDP);
|
|
|
|
if (addr == NULL)
|
|
addr = session_get_peer(session);
|
|
|
|
if (pkt == NULL)
|
|
pkt = worker_task_get_pktbuf(task);
|
|
|
|
if (session_flags(session)->outgoing && handle->type == UV_TCP) {
|
|
size_t try_limit = session_tasklist_get_len(session) + 1;
|
|
uint16_t msg_id = knot_wire_get_id(pkt->wire);
|
|
size_t try_count = 0;
|
|
while (session_tasklist_find_msgid(session, msg_id) &&
|
|
try_count <= try_limit) {
|
|
++msg_id;
|
|
++try_count;
|
|
}
|
|
if (try_count > try_limit)
|
|
return kr_error(ENOENT);
|
|
worker_task_pkt_set_msgid(task, msg_id);
|
|
}
|
|
|
|
struct worker_ctx *worker = ctx->worker;
|
|
/* Note time for upstream RTT */
|
|
task->send_time = kr_now();
|
|
task->recv_time = 0; // task structure is being reused so we have to zero this out here
|
|
/* Send using given protocol */
|
|
if (kr_fails_assert(!session_flags(session)->closing))
|
|
return qr_task_on_send(task, NULL, kr_error(EIO));
|
|
|
|
/* Pending ioreq on current task */
|
|
qr_task_ref(task);
|
|
|
|
if (session_flags(session)->has_http) {
|
|
#if ENABLE_DOH2
|
|
uv_write_t *write_req = malloc(sizeof(*write_req));
|
|
if (!write_req)
|
|
return qr_task_on_send(task, handle, kr_error(ENOMEM));
|
|
write_req->data = task;
|
|
ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
|
|
if (ret != 0)
|
|
free(write_req);
|
|
#else
|
|
ret = kr_error(ENOPROTOOPT);
|
|
#endif
|
|
} else if (session_flags(session)->has_tls) {
|
|
ret = tls_write(handle, pkt);
|
|
} else if (handle->type == UV_UDP) {
|
|
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
|
|
ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
|
|
if (ret >= 0 && ret == pkt->size) {
|
|
ret = 0; // this is the success
|
|
} else if (ret == UV_EAGAIN || ret >= 0) {
|
|
ret = kr_error(ENOBUFS);
|
|
}
|
|
} else if (handle->type == UV_TCP) {
|
|
uint8_t size_buf[2] = { pkt->size / 256, pkt->size % 256 };
|
|
uv_buf_t buf[2] = {
|
|
{ (char *)&size_buf, 2 },
|
|
{ (char *)pkt->wire, pkt->size },
|
|
};
|
|
ret = uv_try_write((uv_stream_t *)handle, buf, 2);
|
|
if (ret >= 0 && ret == pkt->size + 2) {
|
|
ret = 0; // this is the success
|
|
} else if (ret == UV_EAGAIN || ret >= 0) {
|
|
ret = kr_error(ENOBUFS);
|
|
}
|
|
} else {
|
|
kr_assert(false);
|
|
ret = kr_error(EINVAL);
|
|
}
|
|
|
|
// FIXME: Hack to avoid a SERVFAIL when IPv6 is unavailable on the host
|
|
// machine. We uncovered this while changing the above writes/sends to
|
|
// try_writes/try_sends. We originally were not getting the
|
|
// `ENETUNREACH` from libuv and were just timeouting the request,
|
|
// falling back to IPv4 eventually.
|
|
//
|
|
// Note that this does not happen in Knot Resolver 6 with the rewritten
|
|
// I/O, so there is probably a bug somewhere in here, but we were not
|
|
// able to track it down.
|
|
if (ret == UV_ENETUNREACH) {
|
|
ret = 0;
|
|
}
|
|
|
|
if (ret == 0) {
|
|
if (!session_flags(session)->has_http) { // instead of completion callback
|
|
qr_task_on_send(task, handle, ret);
|
|
qr_task_unref(task);
|
|
}
|
|
session_touch(session);
|
|
if (session_flags(session)->outgoing) {
|
|
session_tasklist_add(session, task);
|
|
}
|
|
if (worker->too_many_open &&
|
|
worker->stats.rconcurrent <
|
|
worker->rconcurrent_highwatermark - 10) {
|
|
worker->too_many_open = false;
|
|
}
|
|
} else {
|
|
qr_task_unref(task);
|
|
if (ret == UV_EMFILE) {
|
|
worker->too_many_open = true;
|
|
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
|
|
ret = kr_error(UV_EMFILE);
|
|
}
|
|
|
|
if (session_flags(session)->has_http)
|
|
worker->stats.err_http += 1;
|
|
else if (session_flags(session)->has_tls)
|
|
worker->stats.err_tls += 1;
|
|
else if (handle->type == UV_UDP)
|
|
worker->stats.err_udp += 1;
|
|
else
|
|
worker->stats.err_tcp += 1;
|
|
}
|
|
|
|
/* Update outgoing query statistics */
|
|
if (session_flags(session)->outgoing && addr) {
|
|
if (session_flags(session)->has_tls)
|
|
worker->stats.tls += 1;
|
|
else if (handle->type == UV_UDP)
|
|
worker->stats.udp += 1;
|
|
else
|
|
worker->stats.tcp += 1;
|
|
|
|
if (addr->sa_family == AF_INET6)
|
|
worker->stats.ipv6 += 1;
|
|
else if (addr->sa_family == AF_INET)
|
|
worker->stats.ipv4 += 1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static struct kr_query *task_get_last_pending_query(struct qr_task *task)
|
|
{
|
|
if (!task || task->ctx->req.rplan.pending.len == 0) {
|
|
return NULL;
|
|
}
|
|
|
|
return array_tail(task->ctx->req.rplan.pending);
|
|
}
|
|
|
|
static int session_tls_hs_cb(struct session *session, int status)
|
|
{
|
|
if (kr_fails_assert(session_flags(session)->outgoing))
|
|
return kr_error(EINVAL);
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
int deletion_res = worker_del_tcp_waiting(the_worker, peer);
|
|
int ret = kr_ok();
|
|
|
|
if (status) {
|
|
struct qr_task *task = session_waitinglist_get(session);
|
|
if (task) {
|
|
// TLS handshake failed, report it to server selection
|
|
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_TLS_HANDSHAKE_FAILED);
|
|
}
|
|
#ifndef NDEBUG
|
|
else {
|
|
/* Task isn't in the list of tasks
|
|
* waiting for connection to upstream.
|
|
* So that it MUST be unsuccessful rehandshake.
|
|
* Check it. */
|
|
kr_require(deletion_res != 0);
|
|
struct kr_sockaddr_key_storage key;
|
|
ssize_t keylen = kr_sockaddr_key(&key, peer);
|
|
if (keylen < 0)
|
|
return keylen;
|
|
trie_val_t *val;
|
|
kr_require((val = trie_get_try(the_worker->tcp_connected, key.bytes, keylen)) && *val);
|
|
}
|
|
#endif
|
|
return ret;
|
|
}
|
|
|
|
/* handshake was completed successfully */
|
|
struct tls_client_ctx *tls_client_ctx = session_tls_get_client_ctx(session);
|
|
tls_client_param_t *tls_params = tls_client_ctx->params;
|
|
gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
|
|
if (gnutls_session_is_resumed(tls_session) != 0) {
|
|
kr_log_debug(TLSCLIENT, "TLS session has resumed\n");
|
|
} else {
|
|
kr_log_debug(TLSCLIENT, "TLS session has not resumed\n");
|
|
/* session wasn't resumed, delete old session data ... */
|
|
if (tls_params->session_data.data != NULL) {
|
|
gnutls_free(tls_params->session_data.data);
|
|
tls_params->session_data.data = NULL;
|
|
tls_params->session_data.size = 0;
|
|
}
|
|
/* ... and get the new session data */
|
|
gnutls_datum_t tls_session_data = { NULL, 0 };
|
|
ret = gnutls_session_get_data2(tls_session, &tls_session_data);
|
|
if (ret == 0) {
|
|
tls_params->session_data = tls_session_data;
|
|
}
|
|
}
|
|
|
|
struct session *s = worker_find_tcp_connected(the_worker, peer);
|
|
ret = kr_ok();
|
|
if (deletion_res == kr_ok()) {
|
|
/* peer was in the waiting list, add to the connected list. */
|
|
if (s) {
|
|
/* Something went wrong,
|
|
* peer already is in the connected list. */
|
|
ret = kr_error(EINVAL);
|
|
} else {
|
|
ret = worker_add_tcp_connected(the_worker, peer, session);
|
|
}
|
|
} else {
|
|
/* peer wasn't in the waiting list.
|
|
* It can be
|
|
* 1) either successful rehandshake; in this case peer
|
|
* must be already in the connected list.
|
|
* 2) or successful handshake with session, which was timed out
|
|
* by on_tcp_connect_timeout(); after successful tcp connection;
|
|
* in this case peer isn't in the connected list.
|
|
**/
|
|
if (!s || s != session) {
|
|
ret = kr_error(EINVAL);
|
|
}
|
|
}
|
|
if (ret == kr_ok()) {
|
|
while (!session_waitinglist_is_empty(session)) {
|
|
struct qr_task *t = session_waitinglist_get(session);
|
|
ret = qr_task_send(t, session, NULL, NULL);
|
|
if (ret != 0) {
|
|
break;
|
|
}
|
|
session_waitinglist_pop(session, true);
|
|
}
|
|
} else {
|
|
ret = kr_error(EINVAL);
|
|
}
|
|
|
|
if (ret != kr_ok()) {
|
|
/* Something went wrong.
|
|
* Either addition to the list of connected sessions
|
|
* or write to upstream failed. */
|
|
worker_del_tcp_connected(the_worker, peer);
|
|
session_waitinglist_finalize(session, KR_STATE_FAIL);
|
|
session_tasklist_finalize(session, KR_STATE_FAIL);
|
|
session_close(session);
|
|
} else {
|
|
session_timer_stop(session);
|
|
session_timer_start(session, tcp_timeout_trigger,
|
|
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
|
|
}
|
|
return kr_ok();
|
|
}
|
|
|
|
static int send_waiting(struct session *session)
|
|
{
|
|
int ret = 0;
|
|
while (!session_waitinglist_is_empty(session)) {
|
|
struct qr_task *t = session_waitinglist_get(session);
|
|
ret = qr_task_send(t, session, NULL, NULL);
|
|
if (ret != 0) {
|
|
struct worker_ctx *worker = t->ctx->worker;
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
session_waitinglist_finalize(session, KR_STATE_FAIL);
|
|
session_tasklist_finalize(session, KR_STATE_FAIL);
|
|
worker_del_tcp_connected(worker, peer);
|
|
session_close(session);
|
|
break;
|
|
}
|
|
// Let's be a bit defensive and check that nothing's changed before _pop()
|
|
// and recover if it has, as qr_task_send() is rather complex.
|
|
// TODO: fully analyze why the waitinglist could get empty here.
|
|
if (session_waitinglist_get(session) == t) {
|
|
session_waitinglist_pop(session, true);
|
|
} else { // a normal assertion could kr_error_log() too much in some rarer cases
|
|
VERBOSE_MSG(NULL, "soft assertion: waitinglist mismatch in send_waiting()\n");
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static void on_connect(uv_connect_t *req, int status)
|
|
{
|
|
struct worker_ctx *worker = the_worker;
|
|
kr_require(worker);
|
|
uv_stream_t *handle = req->handle;
|
|
struct session *session = handle->data;
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
free(req);
|
|
|
|
if (kr_fails_assert(session_flags(session)->outgoing))
|
|
return;
|
|
|
|
if (session_flags(session)->closing) {
|
|
worker_del_tcp_waiting(worker, peer);
|
|
kr_assert(session_is_empty(session));
|
|
return;
|
|
}
|
|
|
|
const bool log_debug = kr_log_is_debug(WORKER, NULL);
|
|
|
|
/* Check if the connection is in the waiting list.
|
|
* If no, most likely this is timed out connection
|
|
* which was removed from waiting list by
|
|
* on_tcp_connect_timeout() callback. */
|
|
struct session *s = worker_find_tcp_waiting(worker, peer);
|
|
if (!s || s != session) {
|
|
/* session isn't on the waiting list.
|
|
* it's timed out session. */
|
|
if (log_debug) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(WORKER, "=> connected to '%s', but session "
|
|
"is already timed out, close\n",
|
|
peer_str ? peer_str : "");
|
|
}
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
session_waitinglist_retry(session, false);
|
|
session_close(session);
|
|
return;
|
|
}
|
|
|
|
s = worker_find_tcp_connected(worker, peer);
|
|
if (s) {
|
|
/* session already in the connected list.
|
|
* Something went wrong, it can be due to races when kresd has tried
|
|
* to reconnect to upstream after unsuccessful attempt. */
|
|
if (log_debug) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(WORKER, "=> connected to '%s', but peer "
|
|
"is already connected, close\n",
|
|
peer_str ? peer_str : "");
|
|
}
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
session_waitinglist_retry(session, false);
|
|
session_close(session);
|
|
return;
|
|
}
|
|
|
|
if (status != 0) {
|
|
if (log_debug) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(WORKER, "=> connection to '%s' failed (%s), flagged as 'bad'\n",
|
|
peer_str ? peer_str : "", uv_strerror(status));
|
|
}
|
|
worker_del_tcp_waiting(worker, peer);
|
|
struct qr_task *task = session_waitinglist_get(session);
|
|
if (task && status != UV_ETIMEDOUT) {
|
|
/* Penalize upstream.
|
|
* In case of UV_ETIMEDOUT upstream has been
|
|
* already penalized in on_tcp_connect_timeout() */
|
|
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
|
|
}
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
session_waitinglist_retry(session, false);
|
|
session_close(session);
|
|
return;
|
|
}
|
|
|
|
if (!session_flags(session)->has_tls) {
|
|
/* if there is a TLS, session still waiting for handshake,
|
|
* otherwise remove it from waiting list */
|
|
if (worker_del_tcp_waiting(worker, peer) != 0) {
|
|
/* session isn't in list of waiting queries, *
|
|
* something gone wrong */
|
|
session_waitinglist_finalize(session, KR_STATE_FAIL);
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
session_close(session);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (log_debug) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
kr_log_debug(WORKER, "=> connected to '%s'\n", peer_str ? peer_str : "");
|
|
}
|
|
|
|
session_flags(session)->connected = true;
|
|
session_start_read(session);
|
|
|
|
int ret = kr_ok();
|
|
if (session_flags(session)->has_tls) {
|
|
struct tls_client_ctx *tls_ctx = session_tls_get_client_ctx(session);
|
|
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
|
|
if (ret == kr_error(EAGAIN)) {
|
|
session_timer_stop(session);
|
|
session_timer_start(session, tcp_timeout_trigger,
|
|
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
|
|
return;
|
|
}
|
|
} else {
|
|
worker_add_tcp_connected(worker, peer, session);
|
|
}
|
|
|
|
ret = send_waiting(session);
|
|
if (ret != 0) {
|
|
return;
|
|
}
|
|
|
|
session_timer_stop(session);
|
|
session_timer_start(session, tcp_timeout_trigger,
|
|
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
|
|
}
|
|
|
|
static void on_tcp_connect_timeout(uv_timer_t *timer)
|
|
{
|
|
struct session *session = timer->data;
|
|
|
|
uv_timer_stop(timer);
|
|
struct worker_ctx *worker = the_worker;
|
|
kr_require(worker);
|
|
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
worker_del_tcp_waiting(worker, peer);
|
|
|
|
struct qr_task *task = session_waitinglist_get(session);
|
|
if (!task) {
|
|
/* Normally shouldn't happen. */
|
|
const char *peer_str = kr_straddr(peer);
|
|
VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
|
|
peer_str ? peer_str : "");
|
|
return;
|
|
}
|
|
|
|
struct kr_query *qry = task_get_last_pending_query(task);
|
|
if (kr_log_is_debug_qry(WORKER, qry)) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
|
|
peer_str ? peer_str : "");
|
|
}
|
|
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
|
|
|
|
worker->stats.timeout += session_waitinglist_get_len(session);
|
|
session_waitinglist_retry(session, true);
|
|
kr_assert(session_tasklist_is_empty(session));
|
|
/* uv_cancel() doesn't support uv_connect_t request,
|
|
* so that we can't cancel it.
|
|
* There still exists possibility of successful connection
|
|
* for this request.
|
|
* So connection callback (on_connect()) must check
|
|
* if connection is in the list of waiting connection.
|
|
* If no, most likely this is timed out connection even if
|
|
* it was successful. */
|
|
}
|
|
|
|
/* This is called when I/O timeouts */
|
|
static void on_udp_timeout(uv_timer_t *timer)
|
|
{
|
|
struct session *session = timer->data;
|
|
kr_assert(session_get_handle(session)->data == session);
|
|
kr_assert(session_tasklist_get_len(session) == 1);
|
|
kr_assert(session_waitinglist_is_empty(session));
|
|
|
|
uv_timer_stop(timer);
|
|
|
|
struct qr_task *task = session_tasklist_get_first(session);
|
|
if (!task)
|
|
return;
|
|
struct worker_ctx *worker = task->ctx->worker;
|
|
|
|
if (task->leading && task->pending_count > 0) {
|
|
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
|
|
}
|
|
|
|
task->timeouts += 1;
|
|
worker->stats.timeout += 1;
|
|
qr_task_step(task, NULL, NULL);
|
|
}
|
|
|
|
static uv_handle_t *transmit(struct qr_task *task)
|
|
{
|
|
uv_handle_t *ret = NULL;
|
|
|
|
if (task) {
|
|
struct kr_transport* transport = task->transport;
|
|
|
|
struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
|
|
|
|
if (!choice) {
|
|
return ret;
|
|
}
|
|
if (task->pending_count >= MAX_PENDING) {
|
|
return ret;
|
|
}
|
|
/* Checkout answer before sending it */
|
|
struct request_ctx *ctx = task->ctx;
|
|
if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
|
|
return ret;
|
|
}
|
|
ret = ioreq_spawn(ctx->worker, SOCK_DGRAM, choice->sin6_family, false, false);
|
|
if (!ret) {
|
|
return ret;
|
|
}
|
|
struct sockaddr *addr = (struct sockaddr *)choice;
|
|
struct session *session = ret->data;
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
|
|
kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
|
|
memcpy(peer, addr, kr_sockaddr_len(addr));
|
|
if (qr_task_send(task, session, (struct sockaddr *)choice,
|
|
task->pktbuf) != 0) {
|
|
session_close(session);
|
|
ret = NULL;
|
|
} else {
|
|
task->pending[task->pending_count] = session;
|
|
task->pending_count += 1;
|
|
session_start_read(session); /* Start reading answer */
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
|
|
{
|
|
if (!task || task->finished) {
|
|
return;
|
|
}
|
|
/* Close pending timer */
|
|
ioreq_kill_pending(task);
|
|
/* Clear from outgoing table. */
|
|
if (!task->leading)
|
|
return;
|
|
char key[SUBREQ_KEY_LEN];
|
|
const int klen = subreq_key(key, task->pktbuf);
|
|
if (klen > 0) {
|
|
void *val_deleted;
|
|
int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
|
|
kr_assert(ret == KNOT_EOK && val_deleted == task);
|
|
}
|
|
/* Notify waiting tasks. */
|
|
struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
|
|
for (size_t i = task->waiting.len; i > 0; i--) {
|
|
struct qr_task *follower = task->waiting.at[i - 1];
|
|
/* Reuse MSGID and 0x20 secret */
|
|
if (follower->ctx->req.rplan.pending.len > 0) {
|
|
struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
|
|
qry->id = leader_qry->id;
|
|
qry->secret = leader_qry->secret;
|
|
|
|
// Note that this transport may not be present in `leader_qry`'s server selection
|
|
follower->transport = task->transport;
|
|
if(follower->transport) {
|
|
follower->transport->deduplicated = true;
|
|
}
|
|
leader_qry->secret = 0; /* Next will be already decoded */
|
|
}
|
|
qr_task_step(follower, packet_source, pkt);
|
|
qr_task_unref(follower);
|
|
}
|
|
task->waiting.len = 0;
|
|
task->leading = false;
|
|
}
|
|
|
|
static void subreq_lead(struct qr_task *task)
|
|
{
|
|
if (kr_fails_assert(task))
|
|
return;
|
|
char key[SUBREQ_KEY_LEN];
|
|
const int klen = subreq_key(key, task->pktbuf);
|
|
if (klen < 0)
|
|
return;
|
|
struct qr_task **tvp = (struct qr_task **)
|
|
trie_get_ins(task->ctx->worker->subreq_out, key, klen);
|
|
if (unlikely(!tvp))
|
|
return; /*ENOMEM*/
|
|
if (kr_fails_assert(*tvp == NULL))
|
|
return;
|
|
*tvp = task;
|
|
task->leading = true;
|
|
}
|
|
|
|
static bool subreq_enqueue(struct qr_task *task)
|
|
{
|
|
if (kr_fails_assert(task))
|
|
return false;
|
|
char key[SUBREQ_KEY_LEN];
|
|
const int klen = subreq_key(key, task->pktbuf);
|
|
if (klen < 0)
|
|
return false;
|
|
struct qr_task **leader = (struct qr_task **)
|
|
trie_get_try(task->ctx->worker->subreq_out, key, klen);
|
|
if (!leader /*ENOMEM*/ || !*leader)
|
|
return false;
|
|
/* Enqueue itself to leader for this subrequest. */
|
|
int ret = array_push_mm((*leader)->waiting, task,
|
|
kr_memreserve, &(*leader)->ctx->req.pool);
|
|
if (unlikely(ret < 0)) /*ENOMEM*/
|
|
return false;
|
|
qr_task_ref(task);
|
|
return true;
|
|
}
|
|
|
|
#if ENABLE_XDP
|
|
static void xdp_tx_waker(uv_idle_t *handle)
|
|
{
|
|
int ret = knot_xdp_send_finish(handle->data);
|
|
if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
|
|
kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
|
|
/* Apparently some drivers need many explicit wake-up calls
|
|
* even if we push no additional packets (in case they accumulated a lot) */
|
|
if (ret != KNOT_EAGAIN)
|
|
uv_idle_stop(handle);
|
|
knot_xdp_send_prepare(handle->data);
|
|
/* LATER(opt.): it _might_ be better for performance to do these two steps
|
|
* at different points in time */
|
|
}
|
|
#endif
|
|
/** Send an answer packet over XDP. */
|
|
static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
|
|
{
|
|
#if ENABLE_XDP
|
|
struct request_ctx *ctx = task->ctx;
|
|
xdp_handle_data_t *xhd = src_handle->data;
|
|
if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
|
|
return qr_task_on_send(task, src_handle, kr_error(EINVAL));
|
|
|
|
knot_xdp_msg_t msg;
|
|
/* We don't have a nice way of preserving the _msg_t from frame allocation,
|
|
* so we manually redo all other parts of knot_xdp_send_alloc() */
|
|
memset(&msg, 0, sizeof(msg));
|
|
bool ipv6 = ctx->source.addr.ip.sa_family == AF_INET6;
|
|
msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
|
|
memcpy(msg.eth_from, &ctx->source.eth_addrs[0], 6);
|
|
memcpy(msg.eth_to, &ctx->source.eth_addrs[1], 6);
|
|
|
|
const struct sockaddr *ip_from = &ctx->source.dst_addr.ip;
|
|
const struct sockaddr *ip_to = &ctx->source.comm_addr.ip;
|
|
memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
|
|
memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to));
|
|
msg.payload.iov_base = ctx->req.answer->wire;
|
|
msg.payload.iov_len = ctx->req.answer->size;
|
|
|
|
uint32_t sent;
|
|
int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
|
|
ctx->req.answer->wire = NULL; /* it's been freed */
|
|
|
|
uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
|
|
kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
|
|
|
|
return qr_task_on_send(task, src_handle, ret);
|
|
#else
|
|
kr_assert(!EINVAL);
|
|
return kr_error(EINVAL);
|
|
#endif
|
|
}
|
|
|
|
static int qr_task_finalize(struct qr_task *task, int state)
|
|
{
|
|
kr_require(task && task->leading == false);
|
|
if (task->finished) {
|
|
return kr_ok();
|
|
}
|
|
struct request_ctx *ctx = task->ctx;
|
|
struct session *source_session = ctx->source.session;
|
|
kr_resolve_finish(&ctx->req, state);
|
|
|
|
task->finished = true;
|
|
if (source_session == NULL) {
|
|
(void) qr_task_on_send(task, NULL, kr_error(EIO));
|
|
return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
|
|
}
|
|
|
|
/* meant to be dropped */
|
|
if (unlikely(ctx->req.answer == NULL || ctx->req.options.NO_ANSWER)) {
|
|
/* For NO_ANSWER, a well-behaved layer should set the state to FAIL */
|
|
kr_assert(!ctx->req.options.NO_ANSWER || (ctx->req.state & KR_STATE_FAIL));
|
|
|
|
(void) qr_task_on_send(task, NULL, kr_ok());
|
|
return kr_ok();
|
|
}
|
|
|
|
if (session_flags(source_session)->closing ||
|
|
ctx->source.addr.ip.sa_family == AF_UNSPEC)
|
|
return kr_error(EINVAL);
|
|
|
|
/* Reference task as the callback handler can close it */
|
|
qr_task_ref(task);
|
|
|
|
/* Send back answer */
|
|
int ret;
|
|
const uv_handle_t *src_handle = session_get_handle(source_session);
|
|
if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
|
|
|| src_handle->type == UV_POLL)) {
|
|
ret = kr_error(EINVAL);
|
|
} else if (src_handle->type == UV_POLL) {
|
|
ret = xdp_push(task, src_handle);
|
|
} else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
|
|
int fd;
|
|
ret = uv_fileno(src_handle, &fd);
|
|
if (ret == 0)
|
|
udp_queue_push(fd, &ctx->req, task);
|
|
else
|
|
kr_assert(false);
|
|
} else {
|
|
ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
|
|
}
|
|
|
|
if (ret != kr_ok()) {
|
|
(void) qr_task_on_send(task, NULL, kr_error(EIO));
|
|
/* Since source session is erroneous detach all tasks. */
|
|
while (!session_tasklist_is_empty(source_session)) {
|
|
struct qr_task *t = session_tasklist_del_first(source_session, false);
|
|
struct request_ctx *c = t->ctx;
|
|
kr_assert(c->source.session == source_session);
|
|
c->source.session = NULL;
|
|
/* Don't finalize them as there can be other tasks
|
|
* waiting for answer to this particular task.
|
|
* (ie. task->leading is true) */
|
|
worker_task_unref(t);
|
|
}
|
|
session_close(source_session);
|
|
}
|
|
|
|
qr_task_unref(task);
|
|
|
|
if (ret != kr_ok() || state != KR_STATE_DONE)
|
|
return kr_error(EIO);
|
|
return kr_ok();
|
|
}
|
|
|
|
static int udp_task_step(struct qr_task *task,
|
|
const struct sockaddr *packet_source, knot_pkt_t *packet)
|
|
{
|
|
/* If there is already outgoing query, enqueue to it. */
|
|
if (subreq_enqueue(task)) {
|
|
return kr_ok(); /* Will be notified when outgoing query finishes. */
|
|
}
|
|
/* Start transmitting */
|
|
uv_handle_t *handle = transmit(task);
|
|
if (handle == NULL) {
|
|
subreq_finalize(task, packet_source, packet);
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
|
|
/* Announce and start subrequest.
|
|
* @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
|
|
*/
|
|
subreq_lead(task);
|
|
|
|
return kr_ok();
|
|
}
|
|
|
|
static int tcp_task_waiting_connection(struct session *session, struct qr_task *task)
|
|
{
|
|
if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing))
|
|
return kr_error(EINVAL);
|
|
/* Add task to the end of list of waiting tasks.
|
|
* It will be notified in on_connect() or qr_task_on_send(). */
|
|
int ret = session_waitinglist_push(session, task);
|
|
if (ret < 0) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
return kr_ok();
|
|
}
|
|
|
|
static int tcp_task_existing_connection(struct session *session, struct qr_task *task)
|
|
{
|
|
if (kr_fails_assert(session_flags(session)->outgoing && !session_flags(session)->closing))
|
|
return kr_error(EINVAL);
|
|
struct request_ctx *ctx = task->ctx;
|
|
struct worker_ctx *worker = ctx->worker;
|
|
|
|
/* If there are any unsent queries, send it first. */
|
|
int ret = send_waiting(session);
|
|
if (ret != 0) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
/* No unsent queries at that point. */
|
|
if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
|
|
/* Too many outstanding queries, answer with SERVFAIL, */
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
/* Send query to upstream. */
|
|
ret = qr_task_send(task, session, NULL, NULL);
|
|
if (ret != 0) {
|
|
/* Error, finalize task with SERVFAIL and
|
|
* close connection to upstream. */
|
|
session_tasklist_finalize(session, KR_STATE_FAIL);
|
|
worker_del_tcp_connected(worker, session_get_peer(session));
|
|
session_close(session);
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
return kr_ok();
|
|
}
|
|
|
|
static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr *addr)
|
|
{
|
|
struct request_ctx *ctx = task->ctx;
|
|
struct worker_ctx *worker = ctx->worker;
|
|
|
|
/* Check if there must be TLS */
|
|
struct tls_client_ctx *tls_ctx = NULL;
|
|
struct network *net = &worker->engine->net;
|
|
tls_client_param_t *entry = tls_client_param_get(net->tls_client_params, addr);
|
|
if (entry) {
|
|
/* Address is configured to be used with TLS.
|
|
* We need to allocate auxiliary data structure. */
|
|
tls_ctx = tls_client_ctx_new(entry, worker);
|
|
if (!tls_ctx) {
|
|
return kr_error(EINVAL);
|
|
}
|
|
}
|
|
|
|
uv_connect_t *conn = malloc(sizeof(uv_connect_t));
|
|
if (!conn) {
|
|
tls_client_ctx_free(tls_ctx);
|
|
return kr_error(EINVAL);
|
|
}
|
|
bool has_http = false;
|
|
bool has_tls = (tls_ctx != NULL);
|
|
uv_handle_t *client = ioreq_spawn(worker, SOCK_STREAM, addr->sa_family, has_tls, has_http);
|
|
if (!client) {
|
|
tls_client_ctx_free(tls_ctx);
|
|
free(conn);
|
|
return kr_error(EINVAL);
|
|
}
|
|
struct session *session = client->data;
|
|
if (kr_fails_assert(session_flags(session)->has_tls == has_tls)) {
|
|
tls_client_ctx_free(tls_ctx);
|
|
free(conn);
|
|
return kr_error(EINVAL);
|
|
}
|
|
if (has_tls) {
|
|
tls_client_ctx_set_session(tls_ctx, session);
|
|
session_tls_set_client_ctx(session, tls_ctx);
|
|
}
|
|
|
|
/* Add address to the waiting list.
|
|
* Now it "is waiting to be connected to." */
|
|
int ret = worker_add_tcp_waiting(worker, addr, session);
|
|
if (ret < 0) {
|
|
free(conn);
|
|
session_close(session);
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
conn->data = session;
|
|
/* Store peer address for the session. */
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
memcpy(peer, addr, kr_sockaddr_len(addr));
|
|
|
|
/* Start watchdog to catch eventual connection timeout. */
|
|
ret = session_timer_start(session, on_tcp_connect_timeout,
|
|
KR_CONN_RTT_MAX, 0);
|
|
if (ret != 0) {
|
|
worker_del_tcp_waiting(worker, addr);
|
|
free(conn);
|
|
session_close(session);
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
struct kr_query *qry = task_get_last_pending_query(task);
|
|
if (kr_log_is_debug_qry(WORKER, qry)) {
|
|
const char *peer_str = kr_straddr(peer);
|
|
VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str ? peer_str : "");
|
|
}
|
|
|
|
/* Start connection process to upstream. */
|
|
ret = uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect);
|
|
if (ret != 0) {
|
|
session_timer_stop(session);
|
|
worker_del_tcp_waiting(worker, addr);
|
|
free(conn);
|
|
session_close(session);
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED);
|
|
return kr_error(EAGAIN);
|
|
}
|
|
|
|
/* Add task to the end of list of waiting tasks.
|
|
* Will be notified either in on_connect() or in qr_task_on_send(). */
|
|
ret = session_waitinglist_push(session, task);
|
|
if (ret < 0) {
|
|
session_timer_stop(session);
|
|
worker_del_tcp_waiting(worker, addr);
|
|
free(conn);
|
|
session_close(session);
|
|
return kr_error(EINVAL);
|
|
}
|
|
|
|
return kr_ok();
|
|
}
|
|
|
|
static int tcp_task_step(struct qr_task *task,
|
|
const struct sockaddr *packet_source, knot_pkt_t *packet)
|
|
{
|
|
if (kr_fails_assert(task->pending_count == 0)) {
|
|
subreq_finalize(task, packet_source, packet);
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
|
|
/* target */
|
|
const struct sockaddr *addr = &task->transport->address.ip;
|
|
if (addr->sa_family == AF_UNSPEC) {
|
|
/* Target isn't defined. Finalize task with SERVFAIL.
|
|
* Although task->pending_count is zero, there are can be followers,
|
|
* so we need to call subreq_finalize() to handle them properly. */
|
|
subreq_finalize(task, packet_source, packet);
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
/* Checkout task before connecting */
|
|
struct request_ctx *ctx = task->ctx;
|
|
if (kr_resolve_checkout(&ctx->req, NULL, task->transport, task->pktbuf) != 0) {
|
|
subreq_finalize(task, packet_source, packet);
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
int ret;
|
|
struct session* session = NULL;
|
|
if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
|
|
/* Connection is in the list of waiting connections.
|
|
* It means that connection establishing is coming right now. */
|
|
ret = tcp_task_waiting_connection(session, task);
|
|
} else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
|
|
/* Connection has been already established. */
|
|
ret = tcp_task_existing_connection(session, task);
|
|
} else {
|
|
/* Make connection. */
|
|
ret = tcp_task_make_connection(task, addr);
|
|
}
|
|
|
|
if (ret != kr_ok()) {
|
|
subreq_finalize(task, addr, packet);
|
|
if (ret == kr_error(EAGAIN)) {
|
|
ret = qr_task_step(task, addr, NULL);
|
|
} else {
|
|
ret = qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int qr_task_step(struct qr_task *task,
|
|
const struct sockaddr *packet_source, knot_pkt_t *packet)
|
|
{
|
|
/* No more steps after we're finished. */
|
|
if (!task || task->finished) {
|
|
return kr_error(ESTALE);
|
|
}
|
|
|
|
/* Close pending I/O requests */
|
|
subreq_finalize(task, packet_source, packet);
|
|
if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
|
|
struct kr_request *req = worker_task_request(task);
|
|
if (!kr_fails_assert(req))
|
|
kr_query_inform_timeout(req, req->current_query);
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
|
|
/* Consume input and produce next query */
|
|
struct request_ctx *ctx = task->ctx;
|
|
if (kr_fails_assert(ctx))
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
struct kr_request *req = &ctx->req;
|
|
struct worker_ctx *worker = ctx->worker;
|
|
|
|
if (worker->too_many_open) {
|
|
/* */
|
|
struct kr_rplan *rplan = &req->rplan;
|
|
if (worker->stats.rconcurrent <
|
|
worker->rconcurrent_highwatermark - 10) {
|
|
worker->too_many_open = false;
|
|
} else {
|
|
if (packet && kr_rplan_empty(rplan)) {
|
|
/* new query; TODO - make this detection more obvious */
|
|
kr_resolve_consume(req, &task->transport, packet);
|
|
}
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
}
|
|
|
|
// Report network RTT back to server selection
|
|
if (packet && task->send_time && task->recv_time) {
|
|
struct kr_query *qry = array_tail(req->rplan.pending);
|
|
qry->server_selection.update_rtt(qry, task->transport, task->recv_time - task->send_time);
|
|
}
|
|
|
|
int state = kr_resolve_consume(req, &task->transport, packet);
|
|
|
|
task->transport = NULL;
|
|
while (state == KR_STATE_PRODUCE) {
|
|
state = kr_resolve_produce(req, &task->transport, task->pktbuf);
|
|
if (unlikely(++task->iter_count > KR_ITER_LIMIT ||
|
|
task->timeouts >= KR_TIMEOUT_LIMIT)) {
|
|
|
|
struct kr_rplan *rplan = &req->rplan;
|
|
struct kr_query *last = kr_rplan_last(rplan);
|
|
if (task->iter_count > KR_ITER_LIMIT) {
|
|
char *msg = "cancelling query due to exceeded iteration count limit";
|
|
VERBOSE_MSG(last, "%s of %d\n", msg, KR_ITER_LIMIT);
|
|
kr_request_set_extended_error(req, KNOT_EDNS_EDE_OTHER,
|
|
"OGHD: exceeded iteration count limit");
|
|
}
|
|
if (task->timeouts >= KR_TIMEOUT_LIMIT) {
|
|
char *msg = "cancelling query due to exceeded timeout retries limit";
|
|
VERBOSE_MSG(last, "%s of %d\n", msg, KR_TIMEOUT_LIMIT);
|
|
kr_request_set_extended_error(req, KNOT_EDNS_EDE_NREACH_AUTH, "QLPL");
|
|
}
|
|
|
|
return qr_task_finalize(task, KR_STATE_FAIL);
|
|
}
|
|
}
|
|
|
|
/* We're done, no more iterations needed */
|
|
if (state & (KR_STATE_DONE|KR_STATE_FAIL)) {
|
|
return qr_task_finalize(task, state);
|
|
} else if (!task->transport || !task->transport->protocol) {
|
|
return qr_task_step(task, NULL, NULL);
|
|
}
|
|
|
|
switch (task->transport->protocol)
|
|
{
|
|
case KR_TRANSPORT_UDP:
|
|
return udp_task_step(task, packet_source, packet);
|
|
case KR_TRANSPORT_TCP: // fall through
|
|
case KR_TRANSPORT_TLS:
|
|
return tcp_task_step(task, packet_source, packet);
|
|
default:
|
|
kr_assert(!EINVAL);
|
|
return kr_error(EINVAL);
|
|
}
|
|
}
|
|
|
|
int worker_submit(struct session *session, struct io_comm_data *comm,
|
|
const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
|
|
{
|
|
if (!session || !pkt)
|
|
return kr_error(EINVAL);
|
|
|
|
uv_handle_t *handle = session_get_handle(session);
|
|
if (!handle || !handle->loop->data)
|
|
return kr_error(EINVAL);
|
|
|
|
const bool is_query = pkt->size > KNOT_WIRE_OFFSET_FLAGS1
|
|
&& knot_wire_get_qr(pkt->wire) == 0;
|
|
const bool is_outgoing = session_flags(session)->outgoing;
|
|
|
|
int ret = 0;
|
|
if (is_query == is_outgoing)
|
|
ret = KNOT_ENOENT;
|
|
|
|
// For responses from upstream, try to find associated task and query.
|
|
// In case of errors, at least try to guess.
|
|
struct qr_task *task = NULL;
|
|
bool task_matched_id = false;
|
|
if (is_outgoing && pkt->size >= 2) {
|
|
const uint16_t id = knot_wire_get_id(pkt->wire);
|
|
task = session_tasklist_del_msgid(session, id);
|
|
task_matched_id = task != NULL;
|
|
if (task_matched_id) // Note receive time for RTT calculation
|
|
task->recv_time = kr_now();
|
|
if (!task_matched_id) {
|
|
ret = KNOT_ENOENT;
|
|
VERBOSE_MSG(NULL, "=> DNS message with mismatching ID %d\n",
|
|
(int)id);
|
|
}
|
|
}
|
|
if (!task && is_outgoing && handle->type == UV_TCP) {
|
|
// Source address of the reply got somewhat validated,
|
|
// so we try to at least guess which query, for error reporting.
|
|
task = session_tasklist_get_first(session);
|
|
}
|
|
struct kr_query *qry = NULL;
|
|
if (task)
|
|
qry = array_tail(task->ctx->req.rplan.pending);
|
|
|
|
// Parse the packet, unless it's useless anyway.
|
|
if (ret == 0) {
|
|
ret = knot_pkt_parse(pkt, 0);
|
|
if (ret == KNOT_ETRAIL && is_outgoing
|
|
&& !kr_fails_assert(pkt->parsed < pkt->size)) {
|
|
// We deal with this later, so that RCODE takes priority.
|
|
ret = 0;
|
|
}
|
|
if (ret && kr_log_is_debug_qry(WORKER, qry)) {
|
|
VERBOSE_MSG(qry, "=> DNS message failed to parse, %s\n",
|
|
knot_strerror(ret));
|
|
}
|
|
}
|
|
|
|
struct http_ctx *http_ctx = NULL;
|
|
#if ENABLE_DOH2
|
|
http_ctx = session_http_get_server_ctx(session);
|
|
|
|
/* Badly formed query when using DoH leads to a Bad Request */
|
|
if (http_ctx && !is_outgoing && ret) {
|
|
http_send_status(session, HTTP_STATUS_BAD_REQUEST);
|
|
return kr_error(ret);
|
|
}
|
|
#endif
|
|
|
|
if (!is_outgoing && http_ctx && queue_len(http_ctx->streams) <= 0)
|
|
return kr_error(ENOENT);
|
|
|
|
const struct sockaddr *addr = comm ? comm->src_addr : NULL;
|
|
|
|
/* Ignore badly formed queries. */
|
|
if (ret) {
|
|
if (is_outgoing && qry) // unusuable response from somewhat validated IP
|
|
qry->server_selection.error(qry, task->transport, KR_SELECTION_MALFORMED);
|
|
if (!is_outgoing)
|
|
the_worker->stats.dropped += 1;
|
|
if (task_matched_id) // notify task that answer won't be coming anymore
|
|
qr_task_step(task, addr, NULL);
|
|
return kr_error(EILSEQ);
|
|
}
|
|
|
|
/* Start new task on listening sockets,
|
|
* or resume if this is subrequest */
|
|
if (!is_outgoing) { /* request from a client */
|
|
struct request_ctx *ctx =
|
|
request_create(the_worker, session, comm, eth_from,
|
|
eth_to, knot_wire_get_id(pkt->wire));
|
|
if (http_ctx)
|
|
queue_pop(http_ctx->streams);
|
|
if (!ctx)
|
|
return kr_error(ENOMEM);
|
|
|
|
ret = request_start(ctx, pkt);
|
|
if (ret != 0) {
|
|
request_free(ctx);
|
|
return kr_error(ENOMEM);
|
|
}
|
|
|
|
task = qr_task_create(ctx);
|
|
if (!task) {
|
|
request_free(ctx);
|
|
return kr_error(ENOMEM);
|
|
}
|
|
|
|
if (handle->type == UV_TCP && qr_task_register(task, session)) {
|
|
return kr_error(ENOMEM);
|
|
}
|
|
} else { /* response from upstream */
|
|
if (task == NULL) {
|
|
return kr_error(ENOENT);
|
|
}
|
|
if (kr_fails_assert(!session_flags(session)->closing))
|
|
return kr_error(EINVAL);
|
|
}
|
|
if (kr_fails_assert(!uv_is_closing(session_get_handle(session))))
|
|
return kr_error(EINVAL);
|
|
|
|
/* Packet was successfully parsed.
|
|
* Task was created (found). */
|
|
session_touch(session);
|
|
|
|
/* Consume input and produce next message */
|
|
return qr_task_step(task, addr, pkt);
|
|
}
|
|
|
|
static int trie_add_tcp_session(trie_t *trie, const struct sockaddr *addr,
|
|
struct session *session)
|
|
{
|
|
if (kr_fails_assert(trie && addr))
|
|
return kr_error(EINVAL);
|
|
struct kr_sockaddr_key_storage key;
|
|
ssize_t keylen = kr_sockaddr_key(&key, addr);
|
|
if (keylen < 0)
|
|
return keylen;
|
|
trie_val_t *val = trie_get_ins(trie, key.bytes, keylen);
|
|
if (kr_fails_assert(*val == NULL))
|
|
return kr_error(EINVAL);
|
|
*val = session;
|
|
return kr_ok();
|
|
}
|
|
|
|
static int trie_del_tcp_session(trie_t *trie, const struct sockaddr *addr)
|
|
{
|
|
if (kr_fails_assert(trie && addr))
|
|
return kr_error(EINVAL);
|
|
struct kr_sockaddr_key_storage key;
|
|
ssize_t keylen = kr_sockaddr_key(&key, addr);
|
|
if (keylen < 0)
|
|
return keylen;
|
|
int ret = trie_del(trie, key.bytes, keylen, NULL);
|
|
return ret ? kr_error(ENOENT) : kr_ok();
|
|
}
|
|
|
|
static struct session *trie_find_tcp_session(trie_t *trie,
|
|
const struct sockaddr *addr)
|
|
{
|
|
if (kr_fails_assert(trie && addr))
|
|
return NULL;
|
|
struct kr_sockaddr_key_storage key;
|
|
ssize_t keylen = kr_sockaddr_key(&key, addr);
|
|
if (keylen < 0)
|
|
return NULL;
|
|
trie_val_t *val = trie_get_try(trie, key.bytes, keylen);
|
|
return val ? *val : NULL;
|
|
}
|
|
|
|
int worker_add_tcp_connected(struct worker_ctx *worker,
|
|
const struct sockaddr* addr,
|
|
struct session *session)
|
|
{
|
|
return trie_add_tcp_session(worker->tcp_connected, addr, session);
|
|
}
|
|
|
|
int worker_del_tcp_connected(struct worker_ctx *worker,
|
|
const struct sockaddr* addr)
|
|
{
|
|
return trie_del_tcp_session(worker->tcp_connected, addr);
|
|
}
|
|
|
|
struct session* worker_find_tcp_connected(struct worker_ctx *worker,
|
|
const struct sockaddr* addr)
|
|
{
|
|
return trie_find_tcp_session(worker->tcp_connected, addr);
|
|
}
|
|
|
|
static int worker_add_tcp_waiting(struct worker_ctx *worker,
|
|
const struct sockaddr* addr,
|
|
struct session *session)
|
|
{
|
|
return trie_add_tcp_session(worker->tcp_waiting, addr, session);
|
|
}
|
|
|
|
int worker_del_tcp_waiting(struct worker_ctx *worker,
|
|
const struct sockaddr* addr)
|
|
{
|
|
return trie_del_tcp_session(worker->tcp_waiting, addr);
|
|
}
|
|
|
|
struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
|
|
const struct sockaddr* addr)
|
|
{
|
|
return trie_find_tcp_session(worker->tcp_waiting, addr);
|
|
}
|
|
|
|
int worker_end_tcp(struct session *session)
|
|
{
|
|
if (!session)
|
|
return kr_error(EINVAL);
|
|
|
|
session_timer_stop(session);
|
|
|
|
struct sockaddr *peer = session_get_peer(session);
|
|
|
|
worker_del_tcp_waiting(the_worker, peer);
|
|
worker_del_tcp_connected(the_worker, peer);
|
|
session_flags(session)->connected = false;
|
|
|
|
struct tls_client_ctx *tls_client_ctx = session_tls_get_client_ctx(session);
|
|
if (tls_client_ctx) {
|
|
/* Avoid gnutls_bye() call */
|
|
tls_set_hs_state(&tls_client_ctx->c, TLS_HS_NOT_STARTED);
|
|
}
|
|
|
|
struct tls_ctx *tls_ctx = session_tls_get_server_ctx(session);
|
|
if (tls_ctx) {
|
|
/* Avoid gnutls_bye() call */
|
|
tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
|
|
}
|
|
|
|
while (!session_waitinglist_is_empty(session)) {
|
|
struct qr_task *task = session_waitinglist_pop(session, false);
|
|
kr_assert(task->refs > 1);
|
|
session_tasklist_del(session, task);
|
|
if (session_flags(session)->outgoing) {
|
|
if (task->ctx->req.options.FORWARD) {
|
|
/* We are in TCP_FORWARD mode.
|
|
* To prevent failing at kr_resolve_consume()
|
|
* qry.flags.TCP must be cleared.
|
|
* TODO - refactoring is needed. */
|
|
struct kr_request *req = &task->ctx->req;
|
|
struct kr_rplan *rplan = &req->rplan;
|
|
struct kr_query *qry = array_tail(rplan->pending);
|
|
qry->flags.TCP = false;
|
|
}
|
|
qr_task_step(task, NULL, NULL);
|
|
} else {
|
|
kr_assert(task->ctx->source.session == session);
|
|
task->ctx->source.session = NULL;
|
|
}
|
|
worker_task_unref(task);
|
|
}
|
|
while (!session_tasklist_is_empty(session)) {
|
|
struct qr_task *task = session_tasklist_del_first(session, false);
|
|
if (session_flags(session)->outgoing) {
|
|
if (task->ctx->req.options.FORWARD) {
|
|
struct kr_request *req = &task->ctx->req;
|
|
struct kr_rplan *rplan = &req->rplan;
|
|
struct kr_query *qry = array_tail(rplan->pending);
|
|
qry->flags.TCP = false;
|
|
}
|
|
qr_task_step(task, NULL, NULL);
|
|
} else {
|
|
kr_assert(task->ctx->source.session == session);
|
|
task->ctx->source.session = NULL;
|
|
}
|
|
worker_task_unref(task);
|
|
}
|
|
session_close(session);
|
|
return kr_ok();
|
|
}
|
|
|
|
knot_pkt_t *worker_resolve_mk_pkt_dname(knot_dname_t *qname, uint16_t qtype, uint16_t qclass,
|
|
const struct kr_qflags *options)
|
|
{
|
|
knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL);
|
|
if (!pkt)
|
|
return NULL;
|
|
knot_pkt_put_question(pkt, qname, qclass, qtype);
|
|
knot_wire_set_rd(pkt->wire);
|
|
knot_wire_set_ad(pkt->wire);
|
|
|
|
/* Add OPT RR, including wire format so modules can see both representations.
|
|
* knot_pkt_put() copies the outside; we need to duplicate the inside manually. */
|
|
knot_rrset_t *opt = knot_rrset_copy(the_worker->engine->resolver.downstream_opt_rr, NULL);
|
|
if (!opt) {
|
|
knot_pkt_free(pkt);
|
|
return NULL;
|
|
}
|
|
if (options->DNSSEC_WANT) {
|
|
knot_edns_set_do(opt);
|
|
}
|
|
knot_pkt_begin(pkt, KNOT_ADDITIONAL);
|
|
int ret = knot_pkt_put(pkt, KNOT_COMPR_HINT_NONE, opt, KNOT_PF_FREE);
|
|
if (ret == KNOT_EOK) {
|
|
free(opt); /* inside is owned by pkt now */
|
|
} else {
|
|
knot_rrset_free(opt, NULL);
|
|
knot_pkt_free(pkt);
|
|
return NULL;
|
|
}
|
|
|
|
if (options->DNSSEC_CD) {
|
|
knot_wire_set_cd(pkt->wire);
|
|
}
|
|
|
|
return pkt;
|
|
}
|
|
|
|
knot_pkt_t *worker_resolve_mk_pkt(const char *qname_str, uint16_t qtype, uint16_t qclass,
|
|
const struct kr_qflags *options)
|
|
{
|
|
uint8_t qname[KNOT_DNAME_MAXLEN];
|
|
if (!knot_dname_from_str(qname, qname_str, sizeof(qname)))
|
|
return NULL;
|
|
return worker_resolve_mk_pkt_dname(qname, qtype, qclass, options);
|
|
}
|
|
|
|
struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
|
|
{
|
|
struct worker_ctx *worker = the_worker;
|
|
if (kr_fails_assert(worker && query))
|
|
return NULL;
|
|
|
|
|
|
struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL,
|
|
worker->next_request_uid);
|
|
if (!ctx)
|
|
return NULL;
|
|
|
|
/* Create task */
|
|
struct qr_task *task = qr_task_create(ctx);
|
|
if (!task) {
|
|
request_free(ctx);
|
|
return NULL;
|
|
}
|
|
|
|
/* Start task */
|
|
int ret = request_start(ctx, query);
|
|
if (ret != 0) {
|
|
/* task is attached to request context,
|
|
* so dereference (and deallocate) it first */
|
|
ctx->task = NULL;
|
|
qr_task_unref(task);
|
|
request_free(ctx);
|
|
return NULL;
|
|
}
|
|
|
|
worker->next_request_uid += 1;
|
|
if (worker->next_request_uid == 0)
|
|
worker->next_request_uid = UINT16_MAX + 1;
|
|
|
|
/* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
|
|
kr_qflags_set(&task->ctx->req.options, options);
|
|
return task;
|
|
}
|
|
|
|
int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
|
|
{
|
|
if (!task)
|
|
return kr_error(EINVAL);
|
|
return qr_task_step(task, NULL, query);
|
|
}
|
|
|
|
int worker_task_numrefs(const struct qr_task *task)
|
|
{
|
|
return task->refs;
|
|
}
|
|
|
|
struct kr_request *worker_task_request(struct qr_task *task)
|
|
{
|
|
if (!task || !task->ctx)
|
|
return NULL;
|
|
|
|
return &task->ctx->req;
|
|
}
|
|
|
|
int worker_task_finalize(struct qr_task *task, int state)
|
|
{
|
|
return qr_task_finalize(task, state);
|
|
}
|
|
|
|
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
|
|
knot_pkt_t *packet)
|
|
{
|
|
return qr_task_step(task, packet_source, packet);
|
|
}
|
|
|
|
void worker_task_complete(struct qr_task *task)
|
|
{
|
|
qr_task_complete(task);
|
|
}
|
|
|
|
void worker_task_ref(struct qr_task *task)
|
|
{
|
|
qr_task_ref(task);
|
|
}
|
|
|
|
void worker_task_unref(struct qr_task *task)
|
|
{
|
|
qr_task_unref(task);
|
|
}
|
|
|
|
void worker_task_timeout_inc(struct qr_task *task)
|
|
{
|
|
task->timeouts += 1;
|
|
}
|
|
|
|
knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
|
|
{
|
|
return task->pktbuf;
|
|
}
|
|
|
|
struct request_ctx *worker_task_get_request(struct qr_task *task)
|
|
{
|
|
return task->ctx;
|
|
}
|
|
|
|
struct kr_transport *worker_task_get_transport(struct qr_task *task)
|
|
{
|
|
return task->transport;
|
|
}
|
|
|
|
struct session *worker_request_get_source_session(const struct kr_request *req)
|
|
{
|
|
static_assert(offsetof(struct request_ctx, req) == 0,
|
|
"Bad struct request_ctx definition.");
|
|
return ((struct request_ctx *)req)->source.session;
|
|
}
|
|
|
|
uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
|
|
{
|
|
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
|
|
uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
|
|
return msg_id;
|
|
}
|
|
|
|
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
|
|
{
|
|
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
|
|
knot_wire_set_id(pktbuf->wire, msgid);
|
|
struct kr_query *q = task_get_last_pending_query(task);
|
|
q->id = msgid;
|
|
}
|
|
|
|
uint64_t worker_task_creation_time(struct qr_task *task)
|
|
{
|
|
return task->creation_time;
|
|
}
|
|
|
|
void worker_task_subreq_finalize(struct qr_task *task)
|
|
{
|
|
subreq_finalize(task, NULL, NULL);
|
|
}
|
|
|
|
bool worker_task_finished(struct qr_task *task)
|
|
{
|
|
return task->finished;
|
|
}
|
|
|
|
/** Reserve worker buffers. We assume worker's been zeroed. */
|
|
static int worker_reserve(struct worker_ctx *worker)
|
|
{
|
|
worker->tcp_connected = trie_create(NULL);
|
|
worker->tcp_waiting = trie_create(NULL);
|
|
worker->subreq_out = trie_create(NULL);
|
|
|
|
mm_ctx_mempool(&worker->pkt_pool, 4 * sizeof(knot_pkt_t));
|
|
|
|
return kr_ok();
|
|
}
|
|
|
|
void worker_deinit(void)
|
|
{
|
|
struct worker_ctx *worker = the_worker;
|
|
if (kr_fails_assert(worker))
|
|
return;
|
|
trie_free(worker->tcp_connected);
|
|
trie_free(worker->tcp_waiting);
|
|
trie_free(worker->subreq_out);
|
|
worker->subreq_out = NULL;
|
|
|
|
for (int i = 0; i < worker->doh_qry_headers.len; i++)
|
|
free((void *)worker->doh_qry_headers.at[i]);
|
|
array_clear(worker->doh_qry_headers);
|
|
|
|
mp_delete(worker->pkt_pool.ctx);
|
|
worker->pkt_pool.ctx = NULL;
|
|
|
|
the_worker = NULL;
|
|
}
|
|
|
|
int worker_init(struct engine *engine, int worker_count)
|
|
{
|
|
if (kr_fails_assert(engine && engine->L && the_worker == NULL))
|
|
return kr_error(EINVAL);
|
|
kr_bindings_register(engine->L);
|
|
|
|
/* Create main worker. */
|
|
struct worker_ctx *worker = &the_worker_value;
|
|
memset(worker, 0, sizeof(*worker));
|
|
worker->engine = engine;
|
|
|
|
uv_loop_t *loop = uv_default_loop();
|
|
worker->loop = loop;
|
|
|
|
worker->count = worker_count;
|
|
|
|
/* Register table for worker per-request variables */
|
|
lua_newtable(engine->L);
|
|
lua_setfield(engine->L, -2, "vars");
|
|
lua_getfield(engine->L, -1, "vars");
|
|
worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
|
|
lua_pop(engine->L, 1);
|
|
|
|
worker->tcp_pipeline_max = MAX_PIPELINED;
|
|
worker->out_addr4.sin_family = AF_UNSPEC;
|
|
worker->out_addr6.sin6_family = AF_UNSPEC;
|
|
|
|
array_init(worker->doh_qry_headers);
|
|
|
|
int ret = worker_reserve(worker);
|
|
if (ret) return ret;
|
|
worker->next_request_uid = UINT16_MAX + 1;
|
|
|
|
/* Set some worker.* fields in Lua */
|
|
lua_getglobal(engine->L, "worker");
|
|
pid_t pid = getpid();
|
|
|
|
auto_free char *pid_str = NULL;
|
|
const char *inst_name = getenv("SYSTEMD_INSTANCE");
|
|
if (inst_name) {
|
|
lua_pushstring(engine->L, inst_name);
|
|
} else {
|
|
ret = asprintf(&pid_str, "%ld", (long)pid);
|
|
kr_assert(ret > 0);
|
|
lua_pushstring(engine->L, pid_str);
|
|
}
|
|
lua_setfield(engine->L, -2, "id");
|
|
|
|
lua_pushnumber(engine->L, pid);
|
|
lua_setfield(engine->L, -2, "pid");
|
|
lua_pushnumber(engine->L, worker_count);
|
|
lua_setfield(engine->L, -2, "count");
|
|
|
|
char cwd[PATH_MAX];
|
|
get_workdir(cwd, sizeof(cwd));
|
|
lua_pushstring(engine->L, cwd);
|
|
lua_setfield(engine->L, -2, "cwd");
|
|
|
|
the_worker = worker;
|
|
loop->data = the_worker;
|
|
/* ^^^^ Now this shouldn't be used anymore, but it's hard to be 100% sure. */
|
|
return kr_ok();
|
|
}
|
|
|
|
#undef VERBOSE_MSG
|