diff options
Diffstat (limited to '')
-rw-r--r-- | src/utils/kxdpgun/main.c | 578 |
1 files changed, 243 insertions, 335 deletions
diff --git a/src/utils/kxdpgun/main.c b/src/utils/kxdpgun/main.c index 523f64f..4e3aa31 100644 --- a/src/utils/kxdpgun/main.c +++ b/src/utils/kxdpgun/main.c @@ -14,12 +14,15 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. */ +#include <arpa/inet.h> #include <assert.h> #include <errno.h> #include <getopt.h> #include <ifaddrs.h> #include <inttypes.h> #include <net/if.h> +#include <netdb.h> +#include <netinet/in.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -28,16 +31,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <time.h> -#include <unistd.h> -#include <netdb.h> - -#include <arpa/inet.h> -#include <netinet/in.h> -#include <net/if.h> #include <sys/ioctl.h> -#include <sys/socket.h> #include <sys/resource.h> +#include <sys/socket.h> +#include <time.h> +#include <unistd.h> #include "libknot/libknot.h" #include "libknot/xdp.h" @@ -46,101 +44,28 @@ #include <gnutls/gnutls.h> #include "libknot/quic/quic.h" #endif // ENABLE_QUIC -#include "contrib/macros.h" -#include "contrib/mempattern.h" -#include "contrib/openbsd/strlcat.h" +#include "contrib/atomic.h" #include "contrib/openbsd/strlcpy.h" #include "contrib/os.h" #include "contrib/sockaddr.h" #include "contrib/toeplitz.h" -#include "contrib/ucw/mempool.h" #include "utils/common/msg.h" #include "utils/common/params.h" #include "utils/kxdpgun/ip_route.h" #include "utils/kxdpgun/load_queries.h" - -#define PROGRAM_NAME "kxdpgun" -#define SPACE " " - -enum { - KXDPGUN_WAIT, - KXDPGUN_START, - KXDPGUN_STOP, -}; +#include "utils/kxdpgun/main.h" +#include "utils/kxdpgun/stats.h" volatile int xdp_trigger = KXDPGUN_WAIT; -volatile unsigned stats_trigger = 0; +volatile knot_atomic_uint64_t stats_trigger = 0; +volatile knot_atomic_bool stats_switch = STATS_SUM; unsigned global_cpu_aff_start = 0; unsigned global_cpu_aff_step = 1; -#define REMOTE_PORT_DEFAULT 53 -#define REMOTE_PORT_DOQ_DEFAULT 853 -#define LOCAL_PORT_MIN 2000 -#define LOCAL_PORT_MAX 65535 -#define QUIC_THREAD_PORTS 100 - -#define RCODE_MAX (0x0F + 1) - -typedef struct { - size_t collected; - uint64_t duration; - uint64_t qry_sent; - uint64_t synack_recv; - uint64_t ans_recv; - uint64_t finack_recv; - uint64_t rst_recv; - uint64_t size_recv; - uint64_t wire_recv; - uint64_t rcodes_recv[RCODE_MAX]; - pthread_mutex_t mutex; -} kxdpgun_stats_t; - static kxdpgun_stats_t global_stats = { 0 }; -typedef enum { - KXDPGUN_IGNORE_NONE = 0, - KXDPGUN_IGNORE_QUERY = (1 << 0), - KXDPGUN_IGNORE_LASTBYTE = (1 << 1), - KXDPGUN_IGNORE_CLOSE = (1 << 2), - KXDPGUN_REUSE_CONN = (1 << 3), -} xdp_gun_ignore_t; - -typedef struct { - union { - struct sockaddr_in local_ip4; - struct sockaddr_in6 local_ip; - struct sockaddr_storage local_ip_ss; - }; - union { - struct sockaddr_in target_ip4; - struct sockaddr_in6 target_ip; - struct sockaddr_storage target_ip_ss; - }; - char dev[IFNAMSIZ]; - uint64_t qps, duration; - unsigned at_once; - uint16_t msgid; - uint16_t edns_size; - uint16_t vlan_tci; - uint8_t local_mac[6], target_mac[6]; - uint8_t local_ip_range; - bool ipv6; - bool tcp; - bool quic; - bool quic_full_handshake; - const char *qlog_dir; - const char *sending_mode; - xdp_gun_ignore_t ignore1; - knot_tcp_ignore_t ignore2; - uint16_t target_port; - knot_xdp_filter_flag_t flags; - unsigned n_threads, thread_id; - knot_eth_rss_conf_t *rss_conf; - knot_xdp_config_t xdp_config; -} xdp_gun_ctx_t; - const static xdp_gun_ctx_t ctx_defaults = { .dev[0] = '\0', .edns_size = 1232, @@ -150,7 +75,9 @@ const static xdp_gun_ctx_t ctx_defaults = { .sending_mode = "", .target_port = 0, .flags = KNOT_XDP_FILTER_UDP | KNOT_XDP_FILTER_PASS, - .xdp_config = { .extra_frames = true }, + .xdp_config = { .ring_size = 2048 }, + .jw = NULL, + .stats_period = 0, }; static void sigterm_handler(int signo) @@ -163,103 +90,8 @@ static void sigusr_handler(int signo) { assert(signo == SIGUSR1); if (global_stats.collected == 0) { - stats_trigger++; - } -} - -static void clear_stats(kxdpgun_stats_t *st) -{ - pthread_mutex_lock(&st->mutex); - st->duration = 0; - st->qry_sent = 0; - st->synack_recv = 0; - st->ans_recv = 0; - st->finack_recv = 0; - st->rst_recv = 0; - st->size_recv = 0; - st->wire_recv = 0; - st->collected = 0; - memset(st->rcodes_recv, 0, sizeof(st->rcodes_recv)); - pthread_mutex_unlock(&st->mutex); -} - -static size_t collect_stats(kxdpgun_stats_t *into, const kxdpgun_stats_t *what) -{ - pthread_mutex_lock(&into->mutex); - into->duration = MAX(into->duration, what->duration); - into->qry_sent += what->qry_sent; - into->synack_recv += what->synack_recv; - into->ans_recv += what->ans_recv; - into->finack_recv += what->finack_recv; - into->rst_recv += what->rst_recv; - into->size_recv += what->size_recv; - into->wire_recv += what->wire_recv; - for (int i = 0; i < RCODE_MAX; i++) { - into->rcodes_recv[i] += what->rcodes_recv[i]; + ATOMIC_ADD(stats_trigger, 1); } - size_t res = ++into->collected; - pthread_mutex_unlock(&into->mutex); - return res; -} - -static void print_stats(kxdpgun_stats_t *st, bool tcp, bool quic, bool recv, uint64_t qps) -{ - pthread_mutex_lock(&st->mutex); - -#define ps(counter) ((typeof(counter))((counter) * 1000 / ((float)st->duration / 1000))) -#define pct(counter) ((counter) * 100.0 / st->qry_sent) - - const char *name = tcp ? "SYNs: " : quic ? "initials:" : "queries: "; - printf("total %s %"PRIu64" (%"PRIu64" pps) (%f%%)\n", name, st->qry_sent, - ps(st->qry_sent), 100.0 * st->qry_sent / (st->duration / 1000000.0 * qps)); - if (st->qry_sent > 0 && recv) { - if (tcp || quic) { - name = tcp ? "established:" : "handshakes: "; - printf("total %s %"PRIu64" (%"PRIu64" pps) (%f%%)\n", name, - st->synack_recv, ps(st->synack_recv), pct(st->synack_recv)); - } - printf("total replies: %"PRIu64" (%"PRIu64" pps) (%f%%)\n", - st->ans_recv, ps(st->ans_recv), pct(st->ans_recv)); - if (tcp) { - printf("total closed: %"PRIu64" (%"PRIu64" pps) (%f%%)\n", - st->finack_recv, ps(st->finack_recv), pct(st->finack_recv)); - } - if (st->rst_recv > 0) { - printf("total reset: %"PRIu64" (%"PRIu64" pps) (%f%%)\n", - st->rst_recv, ps(st->rst_recv), pct(st->rst_recv)); - } - printf("average DNS reply size: %"PRIu64" B\n", - st->ans_recv > 0 ? st->size_recv / st->ans_recv : 0); - printf("average Ethernet reply rate: %"PRIu64" bps (%.2f Mbps)\n", - ps(st->wire_recv * 8), ps((float)st->wire_recv * 8 / (1000 * 1000))); - - for (int i = 0; i < RCODE_MAX; i++) { - if (st->rcodes_recv[i] > 0) { - const knot_lookup_t *rcode = knot_lookup_by_id(knot_rcode_names, i); - const char *rcname = rcode == NULL ? "unknown" : rcode->name; - int space = MAX(9 - strlen(rcname), 0); - printf("responded %s: %.*s%"PRIu64"\n", - rcname, space, " ", st->rcodes_recv[i]); - } - } - } - printf("duration: %"PRIu64" s\n", (st->duration / (1000 * 1000))); - - pthread_mutex_unlock(&st->mutex); -} - -inline static void timer_start(struct timespec *timesp) -{ - clock_gettime(CLOCK_MONOTONIC, timesp); -} - -inline static uint64_t timer_end(struct timespec *timesp) -{ - struct timespec end; - clock_gettime(CLOCK_MONOTONIC, &end); - uint64_t res = (end.tv_sec - timesp->tv_sec) * (uint64_t)1000000; - res += ((int64_t)end.tv_nsec - timesp->tv_nsec) / 1000; - return res; } static unsigned addr_bits(bool ipv6) @@ -267,7 +99,8 @@ static unsigned addr_bits(bool ipv6) return ipv6 ? 128 : 32; } -static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, uint64_t increment) +static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, + uint64_t increment) { memcpy(&dst->sin_addr, &src->sin_addr, sizeof(dst->sin_addr)); if (increment > 0) { @@ -275,7 +108,8 @@ static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, } } -static void shuffle_sockaddr6(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, uint64_t increment) +static void shuffle_sockaddr6(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, + uint64_t increment) { memcpy(&dst->sin6_addr, &src->sin6_addr, sizeof(dst->sin6_addr)); if (increment > 0) { @@ -293,7 +127,8 @@ static void shuffle_sockaddr(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, if (src->sin6_family == AF_INET6) { shuffle_sockaddr6(dst, src, increment); } else { - shuffle_sockaddr4((struct sockaddr_in *)dst, (struct sockaddr_in *)src, increment); + shuffle_sockaddr4((struct sockaddr_in *)dst, (struct sockaddr_in *)src, + increment); } } @@ -311,7 +146,8 @@ static void next_payload(struct pkt_payload **payload, int increment) } } -static void put_dns_payload(struct iovec *put_into, bool zero_copy, xdp_gun_ctx_t *ctx, struct pkt_payload **payl) +static void put_dns_payload(struct iovec *put_into, bool zero_copy, xdp_gun_ctx_t *ctx, + struct pkt_payload **payl) { if (zero_copy) { put_into->iov_base = (*payl)->payload; @@ -472,19 +308,41 @@ static void quic_free_cb(knot_quic_reply_t *rpl) } #endif // ENABLE_QUIC +static uint64_t timestamp_ns(void) +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return ((uint64_t)ts.tv_sec * 1000000000) + ts.tv_nsec; +} + +static void timer_start(struct timespec *out) +{ + clock_gettime(CLOCK_MONOTONIC, out); +} + +static uint64_t timer_end_ns(const struct timespec *start) +{ + struct timespec end; + clock_gettime(CLOCK_MONOTONIC, &end); + uint64_t res = (end.tv_sec - start->tv_sec) * (uint64_t)1000000000; + res += end.tv_nsec - start->tv_nsec; + return res; +} + void *xdp_gun_thread(void *_ctx) { xdp_gun_ctx_t *ctx = _ctx; struct knot_xdp_socket *xsk = NULL; - struct timespec timer; knot_xdp_msg_t pkts[ctx->at_once]; - uint64_t errors = 0, lost = 0, duration = 0; - kxdpgun_stats_t local_stats = { 0 }; + uint64_t duration_us = 0; + struct timespec timer; + kxdpgun_stats_t local_stats = { 0 }; // cumulative stats of past periods excluding the current + kxdpgun_stats_t periodic_stats = { 0 }; // stats for the current period (see -S option) unsigned stats_triggered = 0; knot_tcp_table_t *tcp_table = NULL; #ifdef ENABLE_QUIC knot_quic_table_t *quic_table = NULL; - struct knot_quic_creds *quic_creds = NULL; + struct knot_creds *quic_creds = NULL; list_t quic_sessions; init_list(&quic_sessions); #endif // ENABLE_QUIC @@ -501,12 +359,13 @@ void *xdp_gun_thread(void *_ctx) } if (ctx->quic) { #ifdef ENABLE_QUIC - quic_creds = knot_quic_init_creds_peer(NULL, NULL, 0); + quic_creds = knot_creds_init_peer(NULL, NULL, 0); if (quic_creds == NULL) { ERR2("failed to initialize QUIC context"); goto cleanup; } - quic_table = knot_quic_table_new(ctx->qps * 100, SIZE_MAX, SIZE_MAX, 1232, quic_creds); + quic_table = knot_quic_table_new(ctx->qps * 100, SIZE_MAX, SIZE_MAX, + 1232, quic_creds); if (quic_table == NULL) { ERR2("failed to allocate QUIC connection table"); goto cleanup; @@ -517,12 +376,12 @@ void *xdp_gun_thread(void *_ctx) #endif // ENABLE_QUIC } - knot_xdp_load_bpf_t mode = (ctx->thread_id == 0 ? - KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); + knot_xdp_load_bpf_t mode = + (ctx->thread_id == 0 ? KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); /* * This mutex prevents libbpf from logging: * 'libbpf: can't get link by id (5535): Resource temporarily unavailable' - */ + */ pthread_mutex_lock(&global_stats.mutex); int ret = knot_xdp_init(&xsk, ctx->dev, ctx->thread_id, ctx->flags, LOCAL_PORT_MIN, LOCAL_PORT_MIN, mode, &ctx->xdp_config); @@ -534,13 +393,7 @@ void *xdp_gun_thread(void *_ctx) } if (ctx->thread_id == 0) { - INFO2("using interface %s, XDP threads %u, IPv%c/%s%s%s, %s mode", - ctx->dev, ctx->n_threads, (ctx->ipv6 ? '6' : '4'), - (ctx->tcp ? "TCP" : ctx->quic ? "QUIC" : "UDP"), - (ctx->sending_mode[0] != '\0' ? " mode " : ""), - (ctx->sending_mode[0] != '\0' ? ctx->sending_mode : ""), - (knot_eth_xdp_mode(if_nametoindex(ctx->dev)) == KNOT_XDP_MODE_FULL ? - "native" : "emulated")); + STATS_HDR(ctx); } struct pollfd pfd = { knot_xdp_socket_fd(xsk), POLLIN, 0 }; @@ -576,7 +429,7 @@ void *xdp_gun_thread(void *_ctx) ctx->target_ip.sin6_port = htobe16(ctx->target_port); knot_sweep_stats_t sweep_stats = { 0 }; - uint16_t local_ports[QUIC_THREAD_PORTS]; + uint16_t local_ports[QUIC_THREAD_PORTS] = { 0 }; uint16_t port = LOCAL_PORT_MIN; for (int i = 0; ctx->quic && i < QUIC_THREAD_PORTS; ++i) { local_ports[i] = adjust_port(ctx, port); @@ -586,24 +439,25 @@ void *xdp_gun_thread(void *_ctx) size_t local_ports_it = 0; #endif // ENABLE_QUIC + local_stats.since = periodic_stats.since = timestamp_ns(); timer_start(&timer); + ctx->stats_start_us = local_stats.since / 1000; - while (duration < ctx->duration + extra_wait) { - + while (duration_us < ctx->duration + extra_wait) { // sending part - if (duration < ctx->duration) { + if (duration_us < ctx->duration) { while (1) { knot_xdp_send_prepare(xsk); unsigned alloced = alloc_pkts(pkts, xsk, ctx, tick); if (alloced < ctx->at_once) { - lost += ctx->at_once - alloced; + periodic_stats.lost += ctx->at_once - alloced; if (alloced == 0) { break; } } if (ctx->tcp) { - for (int i = 0; i < alloced; i++) { + for (uint32_t i = 0; i < alloced; i++) { pkts[i].payload.iov_len = 0; if (!EMPTY_LIST(reuse_conns)) { @@ -621,7 +475,7 @@ void *xdp_gun_thread(void *_ctx) } if (ret == KNOT_EOK) { pkts[i].flags &= ~KNOT_XDP_MSG_SYN; // skip sending respective packet - local_stats.qry_sent++; + periodic_stats.qry_sent++; } free(rl); } @@ -670,14 +524,14 @@ void *xdp_gun_thread(void *_ctx) (ctx->ignore1 & KXDPGUN_IGNORE_LASTBYTE) ? KNOT_QUIC_SEND_IGNORE_LASTBYTE : 0); } if (ret == KNOT_EOK) { - local_stats.qry_sent++; + periodic_stats.qry_sent++; } } (void)knot_xdp_send_finish(xsk); #endif // ENABLE_QUIC break; } else { - for (int i = 0; i < alloced; i++) { + for (uint32_t i = 0; i < alloced; i++) { put_dns_payload(&pkts[i].payload, false, ctx, &payload_ptr); } @@ -685,9 +539,9 @@ void *xdp_gun_thread(void *_ctx) uint32_t really_sent = 0; if (knot_xdp_send(xsk, pkts, alloced, &really_sent) != KNOT_EOK) { - lost += alloced; + periodic_stats.lost += alloced; } - local_stats.qry_sent += really_sent; + periodic_stats.qry_sent += really_sent; (void)knot_xdp_send_finish(xsk); break; @@ -699,7 +553,7 @@ void *xdp_gun_thread(void *_ctx) while (1) { ret = poll(&pfd, 1, 0); if (ret < 0) { - errors++; + periodic_stats.errors++; break; } if (!pfd.revents) { @@ -714,18 +568,19 @@ void *xdp_gun_thread(void *_ctx) } if (ctx->tcp) { knot_tcp_relay_t relays[recvd]; - ret = knot_tcp_recv(relays, pkts, recvd, tcp_table, NULL, ctx->ignore2); - if (ret != KNOT_EOK) { - errors++; - break; - } for (size_t i = 0; i < recvd; i++) { knot_tcp_relay_t *rl = &relays[i]; + ret = knot_tcp_recv(rl, &pkts[i], tcp_table, NULL, ctx->ignore2); + if (ret != KNOT_EOK) { + periodic_stats.errors++; + continue; + } + struct iovec payl; switch (rl->action) { case XDP_TCP_ESTABLISH: - local_stats.synack_recv++; + periodic_stats.synack_recv++; if (ctx->ignore1 & KXDPGUN_IGNORE_QUERY) { break; } @@ -734,20 +589,20 @@ void *xdp_gun_thread(void *_ctx) (ctx->ignore1 & KXDPGUN_IGNORE_LASTBYTE), payl.iov_base, payl.iov_len); if (ret != KNOT_EOK) { - errors++; + periodic_stats.errors++; } break; case XDP_TCP_CLOSE: - local_stats.finack_recv++; + periodic_stats.finack_recv++; break; case XDP_TCP_RESET: - local_stats.rst_recv++; + periodic_stats.rst_recv++; break; default: break; } for (size_t j = 0; rl->inbf != NULL && j < rl->inbf->n_inbufs; j++) { - if (check_dns_payload(&rl->inbf->inbufs[j], ctx, &local_stats)) { + if (check_dns_payload(&rl->inbf->inbufs[j], ctx, &periodic_stats)) { if (!(ctx->ignore1 & KXDPGUN_IGNORE_CLOSE)) { rl->answer = XDP_TCP_CLOSE; } else if ((ctx->ignore1 & KXDPGUN_REUSE_CONN)) { @@ -763,7 +618,7 @@ void *xdp_gun_thread(void *_ctx) ret = knot_tcp_send(xsk, relays, recvd, ctx->at_once); if (ret != KNOT_EOK) { - errors++; + periodic_stats.errors++; } (void)knot_xdp_send_finish(xsk); @@ -781,11 +636,11 @@ void *xdp_gun_thread(void *_ctx) ret = knot_quic_handle(quic_table, &quic_reply, 5000000000L, &conn); if (ret == KNOT_ECONN) { - local_stats.rst_recv++; + periodic_stats.rst_recv++; knot_quic_cleanup(&conn, 1); continue; } else if (ret != 0) { - errors++; + periodic_stats.errors++; knot_quic_cleanup(&conn, 1); break; } @@ -805,7 +660,7 @@ void *xdp_gun_thread(void *_ctx) if ((conn->flags & KNOT_QUIC_CONN_HANDSHAKE_DONE) && conn->streams_count == -1) { conn->streams_count = 1; - local_stats.synack_recv++; + periodic_stats.synack_recv++; if ((ctx->ignore1 & KXDPGUN_IGNORE_QUERY)) { knot_quic_table_rem(conn, quic_table); knot_quic_cleanup(&conn, 1); @@ -822,14 +677,14 @@ void *xdp_gun_thread(void *_ctx) if ((ctx->ignore2 & XDP_TCP_IGNORE_ESTABLISH)) { knot_quic_table_rem(conn, quic_table); knot_quic_cleanup(&conn, 1); - local_stats.synack_recv++; + periodic_stats.synack_recv++; continue; } int64_t s0id; knot_quic_stream_t *stream0 = knot_quic_stream_get_process(conn, &s0id); if (stream0 != NULL && stream0->inbufs != NULL && stream0->inbufs->n_inbufs > 0) { - check_dns_payload(&stream0->inbufs->inbufs[0], ctx, &local_stats); + check_dns_payload(&stream0->inbufs->inbufs[0], ctx, &periodic_stats); stream0->inbufs->n_inbufs = 0; // signal that data have been read out if ((ctx->ignore2 & XDP_TCP_IGNORE_DATA_ACK)) { @@ -837,7 +692,9 @@ void *xdp_gun_thread(void *_ctx) knot_quic_cleanup(&conn, 1); continue; } else if ((ctx->ignore1 & KXDPGUN_REUSE_CONN)) { - if (conn->streams_count > 1) { // keep the number of outstanding streams below MAX_STREAMS_PER_CONN, while preserving at least one at all times + /* keep the number of outstanding streams below MAX_STREAMS_PER_CONN, + * while preserving at least one at all times */ + if (conn->streams_count > 1) { knot_quic_conn_stream_free(conn, conn->streams_first * 4); } ptrlist_add(&reuse_conns, conn, NULL); @@ -846,30 +703,31 @@ void *xdp_gun_thread(void *_ctx) ret = knot_quic_send(quic_table, conn, &quic_reply, 4, (ctx->ignore1 & KXDPGUN_IGNORE_LASTBYTE) ? KNOT_QUIC_SEND_IGNORE_LASTBYTE : 0); if (ret != KNOT_EOK) { - errors++; + periodic_stats.errors++; } - if (!(ctx->ignore1 & KXDPGUN_IGNORE_CLOSE) && (conn->flags & KNOT_QUIC_CONN_SESSION_TAKEN) && - stream0 != NULL && stream0->inbufs != NULL && stream0->inbufs->n_inbufs == 0) { + if (!(ctx->ignore1 & KXDPGUN_IGNORE_CLOSE) + && (conn->flags & KNOT_QUIC_CONN_SESSION_TAKEN) + && stream0 != NULL && stream0->inbufs != NULL + && stream0->inbufs->n_inbufs == 0) { assert(!(ctx->ignore2 & XDP_TCP_IGNORE_DATA_ACK)); quic_reply.handle_ret = KNOT_QUIC_HANDLE_RET_CLOSE; ret = knot_quic_send(quic_table, conn, &quic_reply, 1, 0); knot_quic_table_rem(conn, quic_table); knot_quic_cleanup(&conn, 1); if (ret != KNOT_EOK) { - errors++; + periodic_stats.errors++; } } } (void)knot_xdp_send_finish(xsk); #endif // ENABLE_QUIC } else { - for (int i = 0; i < recvd; i++) { - (void)check_dns_payload(&pkts[i].payload, ctx, - &local_stats); + for (uint32_t i = 0; i < recvd; i++) { + check_dns_payload(&pkts[i].payload, ctx, &periodic_stats); } } - local_stats.wire_recv += wire; + periodic_stats.wire_recv += wire; knot_xdp_recv_finish(xsk, pkts, recvd); pfd.revents = 0; } @@ -882,47 +740,59 @@ void *xdp_gun_thread(void *_ctx) #endif // ENABLE_QUIC // speed and signal part - uint64_t dura_exp = (local_stats.qry_sent * 1000000) / ctx->qps; - duration = timer_end(&timer); - if (xdp_trigger == KXDPGUN_STOP && ctx->duration > duration) { - ctx->duration = duration; + uint64_t duration_ns = timer_end_ns(&timer); + duration_us = duration_ns / 1000; + uint64_t dura_exp = ((local_stats.qry_sent + periodic_stats.qry_sent) * 1000000) / ctx->qps; + if (ctx->thread_id == 0 && ctx->stats_period != 0 && global_stats.collected == 0 + && (duration_ns - (periodic_stats.since - local_stats.since)) >= ctx->stats_period) { + ATOMIC_SET(stats_switch, STATS_PERIODIC); + ATOMIC_ADD(stats_trigger, 1); } - if (stats_trigger > stats_triggered) { - assert(stats_trigger == stats_triggered + 1); - stats_triggered++; - local_stats.duration = duration; - size_t collected = collect_stats(&global_stats, &local_stats); + if (xdp_trigger == KXDPGUN_STOP && ctx->duration > duration_us) { + ctx->duration = duration_us; + } + uint64_t tmp_stats_trigger = ATOMIC_GET(stats_trigger); + if (duration_us < ctx->duration && tmp_stats_trigger > stats_triggered) { + bool tmp_stats_switch = ATOMIC_GET(stats_switch); + stats_triggered = tmp_stats_trigger; + + local_stats.until = periodic_stats.until = local_stats.since + duration_ns; + kxdpgun_stats_t cumulative_stats = periodic_stats; + if (tmp_stats_switch == STATS_PERIODIC) { + collect_periodic_stats(&local_stats, &periodic_stats); + clear_stats(&periodic_stats); + periodic_stats.since = local_stats.since + duration_ns; + } else { + collect_periodic_stats(&cumulative_stats, &local_stats); + cumulative_stats.since = local_stats.since; + } + + size_t collected = collect_stats(&global_stats, &cumulative_stats); + assert(collected <= ctx->n_threads); if (collected == ctx->n_threads) { - print_stats(&global_stats, ctx->tcp, ctx->quic, - !(ctx->flags & KNOT_XDP_FILTER_DROP), - ctx->qps * ctx->n_threads); + STATS_FMT(ctx, &global_stats, tmp_stats_switch); + if (!JSON_MODE(*ctx)) { + puts(STATS_SECTION_SEP); + } clear_stats(&global_stats); + ATOMIC_SET(stats_switch, STATS_SUM); } } - if (dura_exp > duration) { - usleep(dura_exp - duration); + if (dura_exp > duration_us) { + usleep(dura_exp - duration_us); } - if (duration > ctx->duration) { + if (duration_us > ctx->duration) { usleep(1000); } tick++; } + periodic_stats.until = local_stats.since + timer_end_ns(&timer) - extra_wait * 1000; + collect_periodic_stats(&local_stats, &periodic_stats); + + STATS_THRD(ctx, &local_stats); - char recv_str[40] = "", lost_str[40] = "", err_str[40] = ""; - if (!(ctx->flags & KNOT_XDP_FILTER_DROP)) { - (void)snprintf(recv_str, sizeof(recv_str), ", received %"PRIu64, local_stats.ans_recv); - } - if (lost > 0) { - (void)snprintf(lost_str, sizeof(lost_str), ", lost %"PRIu64, lost); - } - if (errors > 0) { - (void)snprintf(err_str, sizeof(err_str), ", errors %"PRIu64, errors); - } - INFO2("thread#%02u: sent %"PRIu64"%s%s%s", - ctx->thread_id, local_stats.qry_sent, recv_str, lost_str, err_str); - local_stats.duration = ctx->duration; collect_stats(&global_stats, &local_stats); cleanup: @@ -943,7 +813,7 @@ cleanup: WALK_LIST_DELSAFE(n, nxt, quic_sessions) { knot_quic_session_load(NULL, n); } - knot_quic_free_creds(quic_creds); + knot_creds_free(quic_creds); #endif // ENABLE_QUIC return NULL; @@ -1118,30 +988,33 @@ static void print_help(void) printf("Usage: %s [options] -i <queries_file> <dest_ip>\n" "\n" "Options:\n" - " -t, --duration <sec> "SPACE"Duration of traffic generation.\n" - " "SPACE" (default is %"PRIu64" seconds)\n" - " -T, --tcp[=debug_mode] "SPACE"Send queries over TCP.\n" - " -U, --quic[=debug_mode] "SPACE"Send queries over QUIC.\n" - " -Q, --qps <qps> "SPACE"Number of queries-per-second (approximately) to be sent.\n" - " "SPACE" (default is %"PRIu64" qps)\n" - " -b, --batch <size> "SPACE"Send queries in a batch of defined size.\n" - " "SPACE" (default is %d for UDP, %d for TCP)\n" - " -r, --drop "SPACE"Drop incoming responses (disables response statistics).\n" - " -p, --port <port> "SPACE"Remote destination port.\n" - " "SPACE" (default is %d for UDP/TCP, %u for QUIC)\n" - " -F, --affinity <spec> "SPACE"CPU affinity in the format [<cpu_start>][s<cpu_step>].\n" - " "SPACE" (default is %s)\n" - " -i, --infile <file> "SPACE"Path to a file with query templates.\n" - " -I, --interface <ifname> "SPACE"Override auto-detected interface for outgoing communication.\n" - " -l, --local <ip[/prefix]>"SPACE"Override auto-detected source IP address or subnet.\n" - " -L, --local-mac <MAC> "SPACE"Override auto-detected local MAC address.\n" - " -R, --remote-mac <MAC> "SPACE"Override auto-detected remote MAC address.\n" - " -v, --vlan <id> "SPACE"Add VLAN 802.1Q header with the given id.\n" - " -e, --edns-size <size> "SPACE"EDNS UDP payload size, range 512-4096 (default 1232)\n" - " -m, --mode <mode> "SPACE"Set XDP mode (auto, copy, generic).\n" - " -G, --qlog <path> "SPACE"Output directory for qlog (useful for QUIC only).\n" - " -h, --help "SPACE"Print the program help.\n" - " -V, --version "SPACE"Print the program version.\n" + " -t, --duration <sec> "SPACE"Duration of traffic generation.\n" + " "SPACE" (default is %"PRIu64" seconds)\n" + " -T, --tcp[=debug_mode] "SPACE"Send queries over TCP.\n" + " -U, --quic[=debug_mode] "SPACE"Send queries over QUIC.\n" + " -Q, --qps <qps> "SPACE"Number of queries-per-second (approximately) to be sent.\n" + " "SPACE" (default is %"PRIu64" qps)\n" + " -b, --batch <size> "SPACE"Send queries in a batch of defined size.\n" + " "SPACE" (default is %d for UDP, %d for TCP)\n" + " -r, --drop "SPACE"Drop incoming responses (disables response statistics).\n" + " -p, --port <port> "SPACE"Remote destination port.\n" + " "SPACE" (default is %d for UDP/TCP, %u for QUIC)\n" + " -F, --affinity <spec> "SPACE"CPU affinity in the format [<cpu_start>][s<cpu_step>].\n" + " "SPACE" (default is %s)\n" + " -I, --interface <ifname> "SPACE"Override auto-detected interface for outgoing communication.\n" + " -i, --infile <file> "SPACE"Path to a file with query templates.\n" + " -B, --binary "SPACE"Specify that input file is in binary format (<length:2><wire:length>).\n" + " -l, --local <ip[/prefix]> "SPACE"Override auto-detected source IP address or subnet.\n" + " -L, --local-mac <MAC> "SPACE"Override auto-detected local MAC address.\n" + " -R, --remote-mac <MAC> "SPACE"Override auto-detected remote MAC address.\n" + " -v, --vlan <id> "SPACE"Add VLAN 802.1Q header with the given id.\n" + " -e, --edns-size <size> "SPACE"EDNS UDP payload size, range 512-4096 (default 1232)\n" + " -m, --mode <mode> "SPACE"Set XDP mode (auto, copy, generic).\n" + " -G, --qlog <path> "SPACE"Output directory for qlog (useful for QUIC only).\n" + " -j, --json "SPACE"Output statistics in json.\n" + " -S, --stats-period <period>"SPACE"Enable periodic statistics printout in milliseconds.\n" + " -h, --help "SPACE"Print the program help.\n" + " -V, --version "SPACE"Print the program version.\n" "\n" "Parameters:\n" " <dest_ip> "SPACE"IPv4 or IPv6 address of the remote destination.\n", @@ -1240,40 +1113,45 @@ static int set_mode(const char *arg, knot_xdp_config_t *config) static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) { + const char *opts_str = "hV::t:Q:b:rp:T::U::F:I:i:Bl:L:R:v:e:m:G:jS:"; struct option opts[] = { - { "help", no_argument, NULL, 'h' }, - { "version", no_argument, NULL, 'V' }, - { "duration", required_argument, NULL, 't' }, - { "qps", required_argument, NULL, 'Q' }, - { "batch", required_argument, NULL, 'b' }, - { "drop", no_argument, NULL, 'r' }, - { "port", required_argument, NULL, 'p' }, - { "tcp", optional_argument, NULL, 'T' }, - { "quic", optional_argument, NULL, 'U' }, - { "affinity", required_argument, NULL, 'F' }, - { "interface", required_argument, NULL, 'I' }, - { "local", required_argument, NULL, 'l' }, - { "infile", required_argument, NULL, 'i' }, - { "local-mac", required_argument, NULL, 'L' }, - { "remote-mac", required_argument, NULL, 'R' }, - { "vlan", required_argument, NULL, 'v' }, - { "edns-size", required_argument, NULL, 'e' }, - { "mode", required_argument, NULL, 'm' }, - { "qlog", required_argument, NULL, 'G' }, - { NULL } + { "help", no_argument, NULL, 'h' }, + { "version", optional_argument, NULL, 'V' }, + { "duration", required_argument, NULL, 't' }, + { "qps", required_argument, NULL, 'Q' }, + { "batch", required_argument, NULL, 'b' }, + { "drop", no_argument, NULL, 'r' }, + { "port", required_argument, NULL, 'p' }, + { "tcp", optional_argument, NULL, 'T' }, + { "quic", optional_argument, NULL, 'U' }, + { "affinity", required_argument, NULL, 'F' }, + { "interface", required_argument, NULL, 'I' }, + { "infile", required_argument, NULL, 'i' }, + { "binary", no_argument, NULL, 'B' }, + { "local", required_argument, NULL, 'l' }, + { "local-mac", required_argument, NULL, 'L' }, + { "remote-mac", required_argument, NULL, 'R' }, + { "vlan", required_argument, NULL, 'v' }, + { "edns-size", required_argument, NULL, 'e' }, + { "mode", required_argument, NULL, 'm' }, + { "qlog", required_argument, NULL, 'G' }, + { "json", no_argument, NULL, 'j' }, + { "stats-period", required_argument, NULL, 'S' }, + { 0 } }; int opt = 0, arg; bool default_at_once = true; double argf; - char *argcp, *local_ip = NULL, *filename = NULL; - while ((opt = getopt_long(argc, argv, "hVt:Q:b:rp:T::U::F:I:l:i:L:R:v:e:m:G:", opts, NULL)) != -1) { + char *argcp, *local_ip = NULL; + input_t input = { .format = TXT }; + while ((opt = getopt_long(argc, argv, opts_str, opts, NULL)) != -1) { switch (opt) { case 'h': print_help(); exit(EXIT_SUCCESS); case 'V': - print_version(PROGRAM_NAME); + print_version(PROGRAM_NAME, optarg != NULL); exit(EXIT_SUCCESS); case 't': assert(optarg); @@ -1366,12 +1244,15 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) case 'I': strlcpy(ctx->dev, optarg, IFNAMSIZ); break; + case 'i': + input.path = optarg; + break; + case 'B': + input.format = BIN; + break; case 'l': local_ip = optarg; break; - case 'i': - filename = optarg; - break; case 'L': if (mac_sscan(optarg, ctx->local_mac) != KNOT_EOK) { ERR2("invalid local MAC address '%s'", optarg); @@ -1415,17 +1296,33 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) case 'G': ctx->qlog_dir = optarg; break; + case 'S': + assert(optarg); + arg = atoi(optarg); + if (arg > 0) { + ctx->stats_period = arg * 1000000; // convert to ns + } else { + ERR2("period must be a positive integer\n"); + return false; + } + break; + case 'j': + if ((ctx->jw = jsonw_new(stdout, JSON_INDENT)) == NULL) { + ERR2("failed to use JSON"); + return false; + } + break; default: print_help(); return false; } } - if (filename == NULL) { + if (input.path == NULL) { print_help(); return false; } size_t qcount = ctx->duration / 1000000 * ctx->qps; - if (!load_queries(filename, ctx->edns_size, ctx->msgid, qcount)) { + if (!load_queries(&input, ctx->edns_size, ctx->msgid, qcount)) { return false; } if (global_payloads == NULL || argc - optind != 1) { @@ -1452,25 +1349,29 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) int main(int argc, char *argv[]) { + int ecode = EXIT_FAILURE; + xdp_gun_ctx_t ctx = ctx_defaults, *thread_ctxs = NULL; ctx.msgid = time(NULL) % UINT16_MAX; + ctx.runid = timestamp_ns() / 1000; + ctx.argv = argv; pthread_t *threads = NULL; if (!get_opts(argc, argv, &ctx)) { - free_global_payloads(); - return EXIT_FAILURE; + goto err; + } + + if (JSON_MODE(ctx)) { + jsonw_list(ctx.jw, NULL); // wrap the json in a list, for syntactic correctness } thread_ctxs = calloc(ctx.n_threads, sizeof(*thread_ctxs)); threads = calloc(ctx.n_threads, sizeof(*threads)); if (thread_ctxs == NULL || threads == NULL) { ERR2("out of memory"); - free(thread_ctxs); - free(threads); - free_global_payloads(); - return EXIT_FAILURE; + goto err; } - for (int i = 0; i < ctx.n_threads; i++) { + for (uint32_t i = 0; i < ctx.n_threads; i++) { thread_ctxs[i] = ctx; thread_ctxs[i].thread_id = i; } @@ -1482,8 +1383,7 @@ int main(int argc, char *argv[]) cur_limit.rlim_max != min_limit.rlim_max) { int ret = setrlimit(RLIMIT_MEMLOCK, &min_limit); if (ret != 0) { - WARN2("unable to increase RLIMIT_MEMLOCK: %s", - strerror(errno)); + WARN2("unable to increase RLIMIT_MEMLOCK: %s", strerror(errno)); } } } @@ -1509,22 +1409,30 @@ int main(int argc, char *argv[]) usleep(20000); } usleep(1000000); - xdp_trigger = KXDPGUN_START; usleep(1000000); for (size_t i = 0; i < ctx.n_threads; i++) { pthread_join(threads[i], NULL); } - if (global_stats.duration > 0 && global_stats.qry_sent > 0) { - print_stats(&global_stats, ctx.tcp, ctx.quic, !(ctx.flags & KNOT_XDP_FILTER_DROP), ctx.qps * ctx.n_threads); + if (DURATION_US(global_stats) > 0 && global_stats.qry_sent > 0) { + if (!JSON_MODE(ctx)) { + puts(STATS_SECTION_SEP); + } + STATS_FMT(&ctx, &global_stats, STATS_SUM); } pthread_mutex_destroy(&global_stats.mutex); + ecode = EXIT_SUCCESS; + +err: free(ctx.rss_conf); free(thread_ctxs); free(threads); free_global_payloads(); - - return EXIT_SUCCESS; + if (JSON_MODE(ctx)) { + jsonw_end(ctx.jw); + jsonw_free(&ctx.jw); + } + return ecode; } |