1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
|
/* Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <libknot/errcode.h>
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
#include <assert.h>
#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"
#include "daemon/tls.h"
#include "daemon/session.h"
#define negotiate_bufsize(func, handle, bufsize_want) do { \
int bufsize = 0; func(handle, &bufsize); \
if (bufsize < bufsize_want) { \
bufsize = bufsize_want; \
func(handle, &bufsize); \
} \
} while (0)
static void check_bufsize(uv_handle_t* handle)
{
return; /* TODO: resurrect after https://github.com/libuv/libuv/issues/419 */
/* We want to buffer at least N waves in advance.
* This is magic presuming we can pull in a whole recvmmsg width in one wave.
* Linux will double this the bufsize wanted.
*/
const int bufsize_want = 2 * sizeof( ((struct worker_ctx *)NULL)->wire_buf ) ;
negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
}
#undef negotiate_bufsize
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
/* UDP sessions use worker buffer for wire data,
* TCP sessions use session buffer for wire data
* (see session_set_handle()).
* TLS sessions use buffer from TLS context.
* The content of the worker buffer is
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
struct session *s = handle->data;
if (!session_flags(s)->has_tls) {
buf->base = (char *) session_wirebuf_get_free_start(s);
buf->len = session_wirebuf_get_free_size(s);
} else {
struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
buf->base = (char *) ctx->recv_buf;
buf->len = sizeof(ctx->recv_buf);
}
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *s = handle->data;
if (session_flags(s)->closing) {
return;
}
if (nread <= 0) {
if (nread < 0) { /* Error response, notify resolver */
worker_submit(s, NULL);
} /* nread == 0 is for freeing buffers, we don't need to do this */
return;
}
if (addr->sa_family == AF_UNSPEC) {
return;
}
struct sockaddr *peer = session_get_peer(s);
if (session_flags(s)->outgoing) {
assert(peer->sa_family != AF_UNSPEC);
if (kr_sockaddr_cmp(peer, addr) != 0) {
return;
}
} else {
memcpy(peer, addr, kr_sockaddr_len(addr));
}
ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base,
nread);
assert(consumed == nread); (void)consumed;
session_wirebuf_process(s);
session_wirebuf_discard(s);
mp_flush(worker->pkt_pool.ctx);
}
static int udp_bind_finalize(uv_handle_t *handle)
{
check_bufsize(handle);
/* Handle is already created, just create context. */
struct session *s = session_new(handle, false);
assert(s);
session_flags(s)->outgoing = false;
return io_start_read(handle);
}
int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
{
unsigned flags = UV_UDP_REUSEADDR;
if (addr->sa_family == AF_INET6) {
flags |= UV_UDP_IPV6ONLY;
}
int ret = uv_udp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
return udp_bind_finalize((uv_handle_t *)handle);
}
int udp_bindfd(uv_udp_t *handle, int fd)
{
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
return udp_bind_finalize((uv_handle_t *)handle);
}
void tcp_timeout_trigger(uv_timer_t *timer)
{
struct session *s = timer->data;
assert(!session_flags(s)->closing);
struct worker_ctx *worker = timer->loop->data;
if (!session_tasklist_is_empty(s)) {
int finalized = session_tasklist_finalize_expired(s);
worker->stats.timeout += finalized;
/* session_tasklist_finalize_expired() may call worker_task_finalize().
* If session is a source session and there were IO errors,
* worker_task_finalize() can filnalize all tasks and close session. */
if (session_flags(s)->closing) {
return;
}
}
if (!session_tasklist_is_empty(s)) {
uv_timer_stop(timer);
session_timer_start(s, tcp_timeout_trigger,
KR_RESOLVE_TIME_LIMIT / 2,
KR_RESOLVE_TIME_LIMIT / 2);
} else {
/* Normally it should not happen,
* but better to check if there anything in this list. */
while (!session_waitinglist_is_empty(s)) {
struct qr_task *t = session_waitinglist_pop(s, false);
worker_task_finalize(t, KR_STATE_FAIL);
worker_task_unref(t);
worker->stats.timeout += 1;
if (session_flags(s)->closing) {
return;
}
}
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t last_activity = session_last_activity(s);
uint64_t idle_time = kr_now() - last_activity;
if (idle_time < idle_in_timeout) {
idle_in_timeout -= idle_time;
uv_timer_stop(timer);
session_timer_start(s, tcp_timeout_trigger,
idle_in_timeout, idle_in_timeout);
} else {
struct sockaddr *peer = session_get_peer(s);
char *peer_str = kr_straddr(peer);
kr_log_verbose("[io] => closing connection to '%s'\n",
peer_str ? peer_str : "");
if (session_flags(s)->outgoing) {
worker_del_tcp_waiting(worker, peer);
worker_del_tcp_connected(worker, peer);
}
session_close(s);
}
}
}
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
struct session *s = handle->data;
assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
handle->type == UV_TCP);
if (session_flags(s)->closing) {
return;
}
/* nread might be 0, which does not indicate an error or EOF.
* This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
if (nread == 0) {
return;
}
if (nread < 0 || !buf->base) {
if (kr_verbose_status) {
struct sockaddr *peer = session_get_peer(s);
char *peer_str = kr_straddr(peer);
kr_log_verbose("[io] => connection to '%s' closed by peer (%s)\n",
peer_str ? peer_str : "",
uv_strerror(nread));
}
worker_end_tcp(s);
return;
}
ssize_t consumed = 0;
const uint8_t *data = (const uint8_t *)buf->base;
ssize_t data_len = nread;
if (session_flags(s)->has_tls) {
/* buf->base points to start of the tls receive buffer.
Decode data free space in session wire buffer. */
consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
if (consumed < 0) {
if (kr_verbose_status) {
struct sockaddr *peer = session_get_peer(s);
char *peer_str = kr_straddr(peer);
kr_log_verbose("[io] => connection to '%s': "
"error processing TLS data, close\n",
peer_str ? peer_str : "");
}
worker_end_tcp(s);
return;
} else if (consumed == 0) {
return;
}
data = session_wirebuf_get_free_start(s);
data_len = consumed;
}
/* data points to start of the free space in session wire buffer.
Simple increase internal counter. */
consumed = session_wirebuf_consume(s, data, data_len);
assert(consumed == data_len);
int ret = session_wirebuf_process(s);
if (ret < 0) {
/* An error has occurred, close the session. */
worker_end_tcp(s);
}
session_wirebuf_compress(s);
struct worker_ctx *worker = handle->loop->data;
mp_flush(worker->pkt_pool.ctx);
}
static void _tcp_accept(uv_stream_t *master, int status, bool tls)
{
if (status != 0) {
return;
}
struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
if (!client) {
return;
}
int res = io_create(master->loop, (uv_handle_t *)client,
SOCK_STREAM, AF_UNSPEC, tls);
if (res) {
if (res == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
}
/* Since res isn't OK struct session wasn't allocated \ borrowed.
* We must release client handle only.
*/
free(client);
return;
}
/* struct session was allocated \ borrowed from memory pool. */
struct session *s = client->data;
assert(session_flags(s)->outgoing == false);
assert(session_flags(s)->has_tls == tls);
if (uv_accept(master, (uv_stream_t *)client) != 0) {
/* close session, close underlying uv handles and
* deallocate (or return to memory pool) memory. */
session_close(s);
return;
}
/* Set deadlines for TCP connection and start reading.
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
struct sockaddr *peer = session_get_peer(s);
int peer_len = sizeof(union inaddr);
int ret = uv_tcp_getpeername(client, peer, &peer_len);
if (ret || peer->sa_family == AF_UNSPEC) {
session_close(s);
return;
}
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
if (tls) {
timeout += TLS_MAX_HANDSHAKE_TIME;
struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
if (!ctx) {
ctx = tls_new(worker);
if (!ctx) {
session_close(s);
return;
}
ctx->c.session = s;
ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
session_tls_set_server_ctx(s, ctx);
}
}
session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
io_start_read((uv_handle_t *)client);
}
static void tcp_accept(uv_stream_t *master, int status)
{
_tcp_accept(master, status, false);
}
static void tls_accept(uv_stream_t *master, int status)
{
_tcp_accept(master, status, true);
}
static int set_tcp_option(uv_handle_t *handle, int option, int val)
{
uv_os_fd_t fd = 0;
if (uv_fileno(handle, &fd) == 0) {
return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
}
return 0; /* N/A */
}
static int tcp_bind_finalize(uv_handle_t *handle)
{
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
(void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
# endif
#endif
handle->data = NULL;
return 0;
}
static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection, int tcp_backlog)
{
unsigned flags = 0;
if (addr->sa_family == AF_INET6) {
flags |= UV_TCP_IPV6ONLY;
}
int ret = uv_tcp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
}
#endif
ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
if (ret != 0) {
return ret;
}
return tcp_bind_finalize((uv_handle_t *)handle);
}
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, int tcp_backlog)
{
return _tcp_bind(handle, addr, tcp_accept, tcp_backlog);
}
int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr, int tcp_backlog)
{
return _tcp_bind(handle, addr, tls_accept, tcp_backlog);
}
static int _tcp_bindfd(uv_tcp_t *handle, int fd, uv_connection_cb connection, int tcp_backlog)
{
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
if (ret != 0) {
return ret;
}
return tcp_bind_finalize((uv_handle_t *)handle);
}
int tcp_bindfd(uv_tcp_t *handle, int fd, int tcp_backlog)
{
return _tcp_bindfd(handle, fd, tcp_accept, tcp_backlog);
}
int tcp_bindfd_tls(uv_tcp_t *handle, int fd, int tcp_backlog)
{
return _tcp_bindfd(handle, fd, tls_accept, tcp_backlog);
}
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls)
{
int ret = -1;
if (type == SOCK_DGRAM) {
ret = uv_udp_init(loop, (uv_udp_t *)handle);
} else if (type == SOCK_STREAM) {
ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
if (ret != 0) {
return ret;
}
struct session *s = session_new(handle, has_tls);
if (s == NULL) {
ret = -1;
}
return ret;
}
void io_deinit(uv_handle_t *handle)
{
if (!handle) {
return;
}
session_free(handle->data);
handle->data = NULL;
}
void io_free(uv_handle_t *handle)
{
io_deinit(handle);
free(handle);
}
int io_start_read(uv_handle_t *handle)
{
switch (handle->type) {
case UV_UDP:
return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
case UV_TCP:
return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
default:
assert(!EINVAL);
return kr_error(EINVAL);
}
}
int io_stop_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_stop((uv_udp_t *)handle);
} else {
return uv_read_stop((uv_stream_t *)handle);
}
}
|