summaryrefslogtreecommitdiffstats
path: root/daemon/io.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--daemon/io.c494
1 files changed, 494 insertions, 0 deletions
diff --git a/daemon/io.c b/daemon/io.c
new file mode 100644
index 0000000..f183d34
--- /dev/null
+++ b/daemon/io.c
@@ -0,0 +1,494 @@
+/* Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <string.h>
+#include <libknot/errcode.h>
+#include <contrib/ucw/lib.h>
+#include <contrib/ucw/mempool.h>
+#include <assert.h>
+
+#include "daemon/io.h"
+#include "daemon/network.h"
+#include "daemon/worker.h"
+#include "daemon/tls.h"
+#include "daemon/session.h"
+
+#define negotiate_bufsize(func, handle, bufsize_want) do { \
+ int bufsize = 0; func(handle, &bufsize); \
+ if (bufsize < bufsize_want) { \
+ bufsize = bufsize_want; \
+ func(handle, &bufsize); \
+ } \
+} while (0)
+
+static void check_bufsize(uv_handle_t* handle)
+{
+ return; /* TODO: resurrect after https://github.com/libuv/libuv/issues/419 */
+ /* We want to buffer at least N waves in advance.
+ * This is magic presuming we can pull in a whole recvmmsg width in one wave.
+ * Linux will double this the bufsize wanted.
+ */
+ const int bufsize_want = 2 * sizeof( ((struct worker_ctx *)NULL)->wire_buf ) ;
+ negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
+ negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
+}
+
+#undef negotiate_bufsize
+
+static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
+{
+ /* UDP sessions use worker buffer for wire data,
+ * TCP sessions use session buffer for wire data
+ * (see session_set_handle()).
+ * TLS sessions use buffer from TLS context.
+ * The content of the worker buffer is
+ * guaranteed to be unchanged only for the duration of
+ * udp_read() and tcp_read().
+ */
+ struct session *s = handle->data;
+ if (!session_flags(s)->has_tls) {
+ buf->base = (char *) session_wirebuf_get_free_start(s);
+ buf->len = session_wirebuf_get_free_size(s);
+ } else {
+ struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
+ buf->base = (char *) ctx->recv_buf;
+ buf->len = sizeof(ctx->recv_buf);
+ }
+}
+
+void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
+ const struct sockaddr *addr, unsigned flags)
+{
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ struct session *s = handle->data;
+ if (session_flags(s)->closing) {
+ return;
+ }
+ if (nread <= 0) {
+ if (nread < 0) { /* Error response, notify resolver */
+ worker_submit(s, NULL);
+ } /* nread == 0 is for freeing buffers, we don't need to do this */
+ return;
+ }
+ if (addr->sa_family == AF_UNSPEC) {
+ return;
+ }
+ struct sockaddr *peer = session_get_peer(s);
+ if (session_flags(s)->outgoing) {
+ assert(peer->sa_family != AF_UNSPEC);
+ if (kr_sockaddr_cmp(peer, addr) != 0) {
+ return;
+ }
+ } else {
+ memcpy(peer, addr, kr_sockaddr_len(addr));
+ }
+ ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base,
+ nread);
+ assert(consumed == nread); (void)consumed;
+ session_wirebuf_process(s);
+ session_wirebuf_discard(s);
+ mp_flush(worker->pkt_pool.ctx);
+}
+
+static int udp_bind_finalize(uv_handle_t *handle)
+{
+ check_bufsize(handle);
+ /* Handle is already created, just create context. */
+ struct session *s = session_new(handle, false);
+ assert(s);
+ session_flags(s)->outgoing = false;
+ return io_start_read(handle);
+}
+
+int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
+{
+ unsigned flags = UV_UDP_REUSEADDR;
+ if (addr->sa_family == AF_INET6) {
+ flags |= UV_UDP_IPV6ONLY;
+ }
+ int ret = uv_udp_bind(handle, addr, flags);
+ if (ret != 0) {
+ return ret;
+ }
+ return udp_bind_finalize((uv_handle_t *)handle);
+}
+
+int udp_bindfd(uv_udp_t *handle, int fd)
+{
+ if (!handle) {
+ return kr_error(EINVAL);
+ }
+
+ int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
+ if (ret != 0) {
+ return ret;
+ }
+ return udp_bind_finalize((uv_handle_t *)handle);
+}
+
+void tcp_timeout_trigger(uv_timer_t *timer)
+{
+ struct session *s = timer->data;
+
+ assert(!session_flags(s)->closing);
+
+ struct worker_ctx *worker = timer->loop->data;
+
+ if (!session_tasklist_is_empty(s)) {
+ int finalized = session_tasklist_finalize_expired(s);
+ worker->stats.timeout += finalized;
+ /* session_tasklist_finalize_expired() may call worker_task_finalize().
+ * If session is a source session and there were IO errors,
+ * worker_task_finalize() can filnalize all tasks and close session. */
+ if (session_flags(s)->closing) {
+ return;
+ }
+
+ }
+ if (!session_tasklist_is_empty(s)) {
+ uv_timer_stop(timer);
+ session_timer_start(s, tcp_timeout_trigger,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ KR_RESOLVE_TIME_LIMIT / 2);
+ } else {
+ /* Normally it should not happen,
+ * but better to check if there anything in this list. */
+ while (!session_waitinglist_is_empty(s)) {
+ struct qr_task *t = session_waitinglist_pop(s, false);
+ worker_task_finalize(t, KR_STATE_FAIL);
+ worker_task_unref(t);
+ worker->stats.timeout += 1;
+ if (session_flags(s)->closing) {
+ return;
+ }
+ }
+ const struct engine *engine = worker->engine;
+ const struct network *net = &engine->net;
+ uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
+ uint64_t last_activity = session_last_activity(s);
+ uint64_t idle_time = kr_now() - last_activity;
+ if (idle_time < idle_in_timeout) {
+ idle_in_timeout -= idle_time;
+ uv_timer_stop(timer);
+ session_timer_start(s, tcp_timeout_trigger,
+ idle_in_timeout, idle_in_timeout);
+ } else {
+ struct sockaddr *peer = session_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_verbose("[io] => closing connection to '%s'\n",
+ peer_str ? peer_str : "");
+ if (session_flags(s)->outgoing) {
+ worker_del_tcp_waiting(worker, peer);
+ worker_del_tcp_connected(worker, peer);
+ }
+ session_close(s);
+ }
+ }
+}
+
+static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
+{
+ struct session *s = handle->data;
+ assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
+ handle->type == UV_TCP);
+
+ if (session_flags(s)->closing) {
+ return;
+ }
+
+ /* nread might be 0, which does not indicate an error or EOF.
+ * This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
+ if (nread == 0) {
+ return;
+ }
+
+ if (nread < 0 || !buf->base) {
+ if (kr_verbose_status) {
+ struct sockaddr *peer = session_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_verbose("[io] => connection to '%s' closed by peer (%s)\n",
+ peer_str ? peer_str : "",
+ uv_strerror(nread));
+ }
+ worker_end_tcp(s);
+ return;
+ }
+
+ ssize_t consumed = 0;
+ const uint8_t *data = (const uint8_t *)buf->base;
+ ssize_t data_len = nread;
+ if (session_flags(s)->has_tls) {
+ /* buf->base points to start of the tls receive buffer.
+ Decode data free space in session wire buffer. */
+ consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
+ if (consumed < 0) {
+ if (kr_verbose_status) {
+ struct sockaddr *peer = session_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_verbose("[io] => connection to '%s': "
+ "error processing TLS data, close\n",
+ peer_str ? peer_str : "");
+ }
+ worker_end_tcp(s);
+ return;
+ } else if (consumed == 0) {
+ return;
+ }
+ data = session_wirebuf_get_free_start(s);
+ data_len = consumed;
+ }
+
+ /* data points to start of the free space in session wire buffer.
+ Simple increase internal counter. */
+ consumed = session_wirebuf_consume(s, data, data_len);
+ assert(consumed == data_len);
+
+ int ret = session_wirebuf_process(s);
+ if (ret < 0) {
+ /* An error has occurred, close the session. */
+ worker_end_tcp(s);
+ }
+ session_wirebuf_compress(s);
+ struct worker_ctx *worker = handle->loop->data;
+ mp_flush(worker->pkt_pool.ctx);
+}
+
+static void _tcp_accept(uv_stream_t *master, int status, bool tls)
+{
+ if (status != 0) {
+ return;
+ }
+
+ struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
+ uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
+ if (!client) {
+ return;
+ }
+ int res = io_create(master->loop, (uv_handle_t *)client,
+ SOCK_STREAM, AF_UNSPEC, tls);
+ if (res) {
+ if (res == UV_EMFILE) {
+ worker->too_many_open = true;
+ worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+ }
+ /* Since res isn't OK struct session wasn't allocated \ borrowed.
+ * We must release client handle only.
+ */
+ free(client);
+ return;
+ }
+
+ /* struct session was allocated \ borrowed from memory pool. */
+ struct session *s = client->data;
+ assert(session_flags(s)->outgoing == false);
+ assert(session_flags(s)->has_tls == tls);
+
+ if (uv_accept(master, (uv_stream_t *)client) != 0) {
+ /* close session, close underlying uv handles and
+ * deallocate (or return to memory pool) memory. */
+ session_close(s);
+ return;
+ }
+
+ /* Set deadlines for TCP connection and start reading.
+ * It will re-check every half of a request time limit if the connection
+ * is idle and should be terminated, this is an educated guess. */
+ struct sockaddr *peer = session_get_peer(s);
+ int peer_len = sizeof(union inaddr);
+ int ret = uv_tcp_getpeername(client, peer, &peer_len);
+ if (ret || peer->sa_family == AF_UNSPEC) {
+ session_close(s);
+ return;
+ }
+
+ const struct engine *engine = worker->engine;
+ const struct network *net = &engine->net;
+ uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
+
+ uint64_t timeout = KR_CONN_RTT_MAX / 2;
+ if (tls) {
+ timeout += TLS_MAX_HANDSHAKE_TIME;
+ struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
+ if (!ctx) {
+ ctx = tls_new(worker);
+ if (!ctx) {
+ session_close(s);
+ return;
+ }
+ ctx->c.session = s;
+ ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
+ session_tls_set_server_ctx(s, ctx);
+ }
+ }
+ session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
+ io_start_read((uv_handle_t *)client);
+}
+
+static void tcp_accept(uv_stream_t *master, int status)
+{
+ _tcp_accept(master, status, false);
+}
+
+static void tls_accept(uv_stream_t *master, int status)
+{
+ _tcp_accept(master, status, true);
+}
+
+static int set_tcp_option(uv_handle_t *handle, int option, int val)
+{
+ uv_os_fd_t fd = 0;
+ if (uv_fileno(handle, &fd) == 0) {
+ return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
+ }
+ return 0; /* N/A */
+}
+
+static int tcp_bind_finalize(uv_handle_t *handle)
+{
+ /* TCP_FASTOPEN enables 1 RTT connection resumptions. */
+#ifdef TCP_FASTOPEN
+# ifdef __linux__
+ (void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
+# else
+ (void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
+# endif
+#endif
+
+ handle->data = NULL;
+ return 0;
+}
+
+static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection, int tcp_backlog)
+{
+ unsigned flags = 0;
+ if (addr->sa_family == AF_INET6) {
+ flags |= UV_TCP_IPV6ONLY;
+ }
+
+ int ret = uv_tcp_bind(handle, addr, flags);
+ if (ret != 0) {
+ return ret;
+ }
+
+ /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
+#ifdef TCP_DEFER_ACCEPT
+ if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
+ kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
+ }
+#endif
+
+ ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
+ if (ret != 0) {
+ return ret;
+ }
+
+ return tcp_bind_finalize((uv_handle_t *)handle);
+}
+
+int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, int tcp_backlog)
+{
+ return _tcp_bind(handle, addr, tcp_accept, tcp_backlog);
+}
+
+int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr, int tcp_backlog)
+{
+ return _tcp_bind(handle, addr, tls_accept, tcp_backlog);
+}
+
+static int _tcp_bindfd(uv_tcp_t *handle, int fd, uv_connection_cb connection, int tcp_backlog)
+{
+ if (!handle) {
+ return kr_error(EINVAL);
+ }
+
+ int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
+ if (ret != 0) {
+ return ret;
+ }
+ return tcp_bind_finalize((uv_handle_t *)handle);
+}
+
+int tcp_bindfd(uv_tcp_t *handle, int fd, int tcp_backlog)
+{
+ return _tcp_bindfd(handle, fd, tcp_accept, tcp_backlog);
+}
+
+int tcp_bindfd_tls(uv_tcp_t *handle, int fd, int tcp_backlog)
+{
+ return _tcp_bindfd(handle, fd, tls_accept, tcp_backlog);
+}
+
+int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls)
+{
+ int ret = -1;
+ if (type == SOCK_DGRAM) {
+ ret = uv_udp_init(loop, (uv_udp_t *)handle);
+ } else if (type == SOCK_STREAM) {
+ ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
+ uv_tcp_nodelay((uv_tcp_t *)handle, 1);
+ }
+ if (ret != 0) {
+ return ret;
+ }
+ struct session *s = session_new(handle, has_tls);
+ if (s == NULL) {
+ ret = -1;
+ }
+ return ret;
+}
+
+void io_deinit(uv_handle_t *handle)
+{
+ if (!handle) {
+ return;
+ }
+ session_free(handle->data);
+ handle->data = NULL;
+}
+
+void io_free(uv_handle_t *handle)
+{
+ io_deinit(handle);
+ free(handle);
+}
+
+int io_start_read(uv_handle_t *handle)
+{
+ switch (handle->type) {
+ case UV_UDP:
+ return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
+ case UV_TCP:
+ return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
+ default:
+ assert(!EINVAL);
+ return kr_error(EINVAL);
+ }
+}
+
+int io_stop_read(uv_handle_t *handle)
+{
+ if (handle->type == UV_UDP) {
+ return uv_udp_recv_stop((uv_udp_t *)handle);
+ } else {
+ return uv_read_stop((uv_stream_t *)handle);
+ }
+}