diff options
Diffstat (limited to 'src/dnsperf.c')
-rw-r--r-- | src/dnsperf.c | 1220 |
1 files changed, 1220 insertions, 0 deletions
diff --git a/src/dnsperf.c b/src/dnsperf.c new file mode 100644 index 0000000..f001370 --- /dev/null +++ b/src/dnsperf.c @@ -0,0 +1,1220 @@ +/* + * Copyright 2019-2021 OARC, Inc. + * Copyright 2017-2018 Akamai Technologies + * Copyright 2006-2016 Nominum, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** + *** DNS Performance Testing Tool + ***/ + +#include "config.h" + +#include "net.h" +#include "datafile.h" +#include "dns.h" +#include "log.h" +#include "opt.h" +#include "os.h" +#include "util.h" +#include "list.h" +#include "buffer.h" + +#include <inttypes.h> +#include <errno.h> +#include <math.h> +#include <pthread.h> +#include <signal.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> +#include <sys/time.h> +#include <openssl/ssl.h> +#include <openssl/conf.h> +#include <openssl/err.h> + +#define DEFAULT_SERVER_NAME "127.0.0.1" +#define DEFAULT_SERVER_PORT 53 +#define DEFAULT_SERVER_TLS_PORT 853 +#define DEFAULT_SERVER_PORTS "udp/tcp 53 or dot/tls 853" +#define DEFAULT_LOCAL_PORT 0 +#define DEFAULT_MAX_OUTSTANDING 100 +#define DEFAULT_TIMEOUT 5 + +#define TIMEOUT_CHECK_TIME 100000 + +#define MAX_INPUT_DATA (64 * 1024) + +#define MAX_SOCKETS 256 + +#define RECV_BATCH_SIZE 16 + +typedef struct { + int argc; + char** argv; + int family; + uint32_t clients; + uint32_t threads; + uint32_t maxruns; + uint64_t timelimit; + perf_sockaddr_t server_addr; + perf_sockaddr_t local_addr; + uint64_t timeout; + uint32_t bufsize; + bool edns; + bool dnssec; + perf_tsigkey_t* tsigkey; + perf_ednsoption_t* edns_option; + uint32_t max_outstanding; + uint32_t max_qps; + uint64_t stats_interval; + bool updates; + bool verbose; + enum perf_net_mode mode; +} config_t; + +typedef struct { + uint64_t start_time; + uint64_t end_time; + uint64_t stop_time; + struct timespec stop_time_ns; +} times_t; + +typedef struct { + uint64_t rcodecounts[16]; + + uint64_t num_sent; + uint64_t num_interrupted; + uint64_t num_timedout; + uint64_t num_completed; + + uint64_t total_request_size; + uint64_t total_response_size; + + uint64_t latency_sum; + uint64_t latency_sum_squares; + uint64_t latency_min; + uint64_t latency_max; +} stats_t; + +typedef perf_list(struct query_info) query_list; + +typedef struct query_info { + uint64_t timestamp; + query_list* list; + char* desc; + struct perf_net_socket* sock; + /* + * This link links the query into the list of outstanding + * queries or the list of available query IDs. + */ + perf_link(struct query_info); +} query_info; + +#define NQIDS 65536 + +typedef struct { + query_info queries[NQIDS]; + query_list outstanding_queries; + query_list unused_queries; + + pthread_t sender; + pthread_t receiver; + + pthread_mutex_t lock; + pthread_cond_t cond; + + unsigned int nsocks; + int current_sock; + struct perf_net_socket* socks; + + bool done_sending; + uint64_t done_send_time; + + const config_t* config; + const times_t* times; + stats_t stats; + + uint32_t max_outstanding; + uint32_t max_qps; + + uint64_t last_recv; +} threadinfo_t; + +static threadinfo_t* threads; + +static pthread_mutex_t start_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER; +static bool started; + +static bool interrupted = false; + +static int threadpipe[2]; +static int mainpipe[2]; +static int intrpipe[2]; + +static perf_datafile_t* input; + +const char* progname = "dnsperf"; + +static void +handle_sigint(int sig) +{ + (void)sig; + if (write(intrpipe[1], "", 1)) { // lgtm [cpp/empty-block] + } +} + +static void +print_initial_status(const config_t* config) +{ + time_t now; + char buf[255], ct[32]; + int i; + + printf("[Status] Command line: %s", progname); + for (i = 1; i < config->argc; i++) + printf(" %s", config->argv[i]); + printf("\n"); + + perf_sockaddr_format(&config->server_addr, buf, sizeof(buf)); + printf("[Status] Sending %s (to %s)\n", + config->updates ? "updates" : "queries", buf); + + now = time(NULL); + printf("[Status] Started at: %s", ctime_r(&now, ct)); + + printf("[Status] Stopping after "); + if (config->timelimit) + printf("%u.%06u seconds", + (unsigned int)(config->timelimit / MILLION), + (unsigned int)(config->timelimit % MILLION)); + if (config->timelimit && config->maxruns) + printf(" or "); + if (config->maxruns) + printf("%u run%s through file", config->maxruns, + config->maxruns == 1 ? "" : "s"); + printf("\n"); +} + +static void +print_final_status(const config_t* config) +{ + const char* reason; + + if (interrupted) + reason = "interruption"; + else if (config->maxruns > 0 && perf_datafile_nruns(input) == config->maxruns) + reason = "end of file"; + else + reason = "time limit"; + + printf("[Status] Testing complete (%s)\n", reason); + printf("\n"); +} + +static double +stddev(uint64_t sum_of_squares, uint64_t sum, uint64_t total) +{ + double squared; + + squared = (double)sum * (double)sum; + return sqrt((sum_of_squares - (squared / total)) / (total - 1)); +} + +static void +print_statistics(const config_t* config, const times_t* times, stats_t* stats) +{ + const char* units; + uint64_t run_time; + bool first_rcode; + uint64_t latency_avg; + unsigned int i; + + units = config->updates ? "Updates" : "Queries"; + + run_time = times->end_time - times->start_time; + + printf("Statistics:\n\n"); + + printf(" %s sent: %" PRIu64 "\n", + units, stats->num_sent); + printf(" %s completed: %" PRIu64 " (%.2lf%%)\n", + units, stats->num_completed, + PERF_SAFE_DIV(100.0 * stats->num_completed, stats->num_sent)); + printf(" %s lost: %" PRIu64 " (%.2lf%%)\n", + units, stats->num_timedout, + PERF_SAFE_DIV(100.0 * stats->num_timedout, stats->num_sent)); + if (stats->num_interrupted > 0) + printf(" %s interrupted: %" PRIu64 " (%.2lf%%)\n", + units, stats->num_interrupted, + PERF_SAFE_DIV(100.0 * stats->num_interrupted, stats->num_sent)); + printf("\n"); + + printf(" Response codes: "); + first_rcode = true; + for (i = 0; i < 16; i++) { + if (stats->rcodecounts[i] == 0) + continue; + if (first_rcode) + first_rcode = false; + else + printf(", "); + printf("%s %" PRIu64 " (%.2lf%%)", + perf_dns_rcode_strings[i], stats->rcodecounts[i], + (stats->rcodecounts[i] * 100.0) / stats->num_completed); + } + printf("\n"); + + printf(" Average packet size: request %u, response %u\n", + (unsigned int)PERF_SAFE_DIV(stats->total_request_size, stats->num_sent), + (unsigned int)PERF_SAFE_DIV(stats->total_response_size, + stats->num_completed)); + printf(" Run time (s): %u.%06u\n", + (unsigned int)(run_time / MILLION), + (unsigned int)(run_time % MILLION)); + printf(" %s per second: %.6lf\n", units, + PERF_SAFE_DIV(stats->num_completed, (((double)run_time) / MILLION))); + + printf("\n"); + + latency_avg = PERF_SAFE_DIV(stats->latency_sum, stats->num_completed); + printf(" Average Latency (s): %u.%06u (min %u.%06u, max %u.%06u)\n", + (unsigned int)(latency_avg / MILLION), + (unsigned int)(latency_avg % MILLION), + (unsigned int)(stats->latency_min / MILLION), + (unsigned int)(stats->latency_min % MILLION), + (unsigned int)(stats->latency_max / MILLION), + (unsigned int)(stats->latency_max % MILLION)); + if (stats->num_completed > 1) { + printf(" Latency StdDev (s): %f\n", + stddev(stats->latency_sum_squares, stats->latency_sum, + stats->num_completed) + / MILLION); + } + + printf("\n"); +} + +static void +sum_stats(const config_t* config, stats_t* total) +{ + unsigned int i, j; + + memset(total, 0, sizeof(*total)); + + for (i = 0; i < config->threads; i++) { + stats_t* stats = &threads[i].stats; + + for (j = 0; j < 16; j++) + total->rcodecounts[j] += stats->rcodecounts[j]; + + total->num_sent += stats->num_sent; + total->num_interrupted += stats->num_interrupted; + total->num_timedout += stats->num_timedout; + total->num_completed += stats->num_completed; + + total->total_request_size += stats->total_request_size; + total->total_response_size += stats->total_response_size; + + total->latency_sum += stats->latency_sum; + total->latency_sum_squares += stats->latency_sum_squares; + if (stats->latency_min < total->latency_min || i == 0) + total->latency_min = stats->latency_min; + if (stats->latency_max > total->latency_max) + total->latency_max = stats->latency_max; + } +} + +static char* +stringify(unsigned int value) +{ + static char buf[20]; + + snprintf(buf, sizeof(buf), "%u", value); + return buf; +} + +static void +setup(int argc, char** argv, config_t* config) +{ + const char* family = NULL; + const char* server_name = DEFAULT_SERVER_NAME; + in_port_t server_port = 0; + const char* local_name = NULL; + in_port_t local_port = DEFAULT_LOCAL_PORT; + const char* filename = NULL; + const char* edns_option = NULL; + const char* tsigkey = NULL; + const char* mode = 0; + + memset(config, 0, sizeof(*config)); + config->argc = argc; + config->argv = argv; + + config->family = AF_UNSPEC; + config->clients = 1; + config->threads = 1; + config->timeout = DEFAULT_TIMEOUT * MILLION; + config->max_outstanding = DEFAULT_MAX_OUTSTANDING; + config->mode = sock_udp; + + perf_opt_add('f', perf_opt_string, "family", + "address family of DNS transport, inet or inet6", "any", + &family); + perf_opt_add('m', perf_opt_string, "mode", "set transport mode: udp, tcp or dot/tls", "udp", &mode); + perf_opt_add('s', perf_opt_string, "server_addr", + "the server to query", DEFAULT_SERVER_NAME, &server_name); + perf_opt_add('p', perf_opt_port, "port", + "the port on which to query the server", + DEFAULT_SERVER_PORTS, &server_port); + perf_opt_add('a', perf_opt_string, "local_addr", + "the local address from which to send queries", NULL, + &local_name); + perf_opt_add('x', perf_opt_port, "local_port", + "the local port from which to send queries", + stringify(DEFAULT_LOCAL_PORT), &local_port); + perf_opt_add('d', perf_opt_string, "datafile", + "the input data file", "stdin", &filename); + perf_opt_add('c', perf_opt_uint, "clients", + "the number of clients to act as", NULL, + &config->clients); + perf_opt_add('T', perf_opt_uint, "threads", + "the number of threads to run", NULL, + &config->threads); + perf_opt_add('n', perf_opt_uint, "maxruns", + "run through input at most N times", NULL, + &config->maxruns); + perf_opt_add('l', perf_opt_timeval, "timelimit", + "run for at most this many seconds", NULL, + &config->timelimit); + perf_opt_add('b', perf_opt_uint, "buffer_size", + "socket send/receive buffer size in kilobytes", NULL, + &config->bufsize); + perf_opt_add('t', perf_opt_timeval, "timeout", + "the timeout for query completion in seconds", + stringify(DEFAULT_TIMEOUT), &config->timeout); + perf_opt_add('e', perf_opt_boolean, NULL, + "enable EDNS 0", NULL, &config->edns); + perf_opt_add('E', perf_opt_string, "code:value", + "send EDNS option", NULL, &edns_option); + perf_opt_add('D', perf_opt_boolean, NULL, + "set the DNSSEC OK bit (implies EDNS)", NULL, + &config->dnssec); + perf_opt_add('y', perf_opt_string, "[alg:]name:secret", + "the TSIG algorithm, name and secret (base64)", NULL, + &tsigkey); + perf_opt_add('q', perf_opt_uint, "num_queries", + "the maximum number of queries outstanding", + stringify(DEFAULT_MAX_OUTSTANDING), + &config->max_outstanding); + perf_opt_add('Q', perf_opt_uint, "max_qps", + "limit the number of queries per second", NULL, + &config->max_qps); + perf_opt_add('S', perf_opt_timeval, "stats_interval", + "print qps statistics every N seconds", + NULL, &config->stats_interval); + perf_opt_add('u', perf_opt_boolean, NULL, + "send dynamic updates instead of queries", + NULL, &config->updates); + perf_opt_add('v', perf_opt_boolean, NULL, + "verbose: report each query and additional information to stdout", + NULL, &config->verbose); + bool log_stdout = false; + perf_opt_add('W', perf_opt_boolean, NULL, "log warnings and errors to stdout instead of stderr", NULL, &log_stdout); + + perf_opt_parse(argc, argv); + + if (log_stdout) { + perf_log_tostdout(); + } + + if (mode != 0) + config->mode = perf_net_parsemode(mode); + + if (!server_port) { + server_port = config->mode == sock_tls ? DEFAULT_SERVER_TLS_PORT : DEFAULT_SERVER_PORT; + } + + if (family != NULL) + config->family = perf_net_parsefamily(family); + perf_net_parseserver(config->family, server_name, server_port, + &config->server_addr); + perf_net_parselocal(config->server_addr.sa.sa.sa_family, + local_name, local_port, &config->local_addr); + + input = perf_datafile_open(filename); + + if (config->maxruns == 0 && config->timelimit == 0) + config->maxruns = 1; + perf_datafile_setmaxruns(input, config->maxruns); + + if (config->dnssec || edns_option != NULL) + config->edns = true; + + if (tsigkey != NULL) + config->tsigkey = perf_tsig_parsekey(tsigkey); + + if (edns_option != NULL) + config->edns_option = perf_edns_parseoption(edns_option); + + /* + * If we run more threads than max-qps, some threads will have + * ->max_qps set to 0, and be unlimited. + */ + if (config->max_qps > 0 && config->threads > config->max_qps) + config->threads = config->max_qps; + + /* + * We also can't run more threads than clients. + */ + if (config->threads > config->clients) + config->threads = config->clients; + +#ifndef HAVE_LDNS + if (config->updates) { + perf_log_fatal("Unable to dynamic update, support not built in"); + } +#endif +} + +static void +cleanup(config_t* config) +{ + unsigned int i; + + perf_datafile_close(&input); + for (i = 0; i < 2; i++) { + close(threadpipe[i]); + close(mainpipe[i]); + close(intrpipe[i]); + } + if (config->tsigkey != NULL) + perf_tsig_destroykey(&config->tsigkey); + if (config->edns_option != NULL) + perf_edns_destroyoption(&config->edns_option); +} + +typedef enum { + prepend_unused, + append_unused, + prepend_outstanding, +} query_move_op; + +static inline void +query_move(threadinfo_t* tinfo, query_info* q, query_move_op op) +{ + perf_list_unlink(*q->list, q); + switch (op) { + case prepend_unused: + q->list = &tinfo->unused_queries; + perf_list_prepend(tinfo->unused_queries, q); + break; + case append_unused: + q->list = &tinfo->unused_queries; + perf_list_append(tinfo->unused_queries, q); + break; + case prepend_outstanding: + q->list = &tinfo->outstanding_queries; + perf_list_prepend(tinfo->outstanding_queries, q); + break; + } +} + +static inline uint64_t +num_outstanding(const stats_t* stats) +{ + return stats->num_sent - stats->num_completed - stats->num_timedout; +} + +static void +wait_for_start(void) +{ + PERF_LOCK(&start_lock); + while (!started) + PERF_WAIT(&start_cond, &start_lock); + PERF_UNLOCK(&start_lock); +} + +static void* +do_send(void* arg) +{ + threadinfo_t* tinfo; + const config_t* config; + const times_t* times; + stats_t* stats; + unsigned int max_packet_size; + perf_buffer_t msg; + uint64_t now, run_time, req_time; + char input_data[MAX_INPUT_DATA]; + perf_buffer_t lines; + perf_region_t used; + query_info* q; + int qid; + unsigned char packet_buffer[MAX_EDNS_PACKET]; + unsigned char* base; + unsigned int length; + int n, i, any_inprogress = 0; + perf_result_t result; + + tinfo = (threadinfo_t*)arg; + config = tinfo->config; + times = tinfo->times; + stats = &tinfo->stats; + max_packet_size = config->edns ? MAX_EDNS_PACKET : MAX_UDP_PACKET; + perf_buffer_init(&msg, packet_buffer, max_packet_size); + perf_buffer_init(&lines, input_data, sizeof(input_data)); + + wait_for_start(); + now = perf_get_time(); + while (!interrupted && now < times->stop_time) { + /* Avoid flooding the network too quickly. */ + if (stats->num_sent < tinfo->max_outstanding && stats->num_sent % 2 == 1) { + if (stats->num_completed == 0) + usleep(1000); + else + sleep(0); + now = perf_get_time(); + } + + /* Rate limiting */ + if (tinfo->max_qps > 0) { + run_time = now - times->start_time; + req_time = (MILLION * stats->num_sent) / tinfo->max_qps; + if (req_time > run_time) { + usleep(req_time - run_time); + now = perf_get_time(); + continue; + } + } + + PERF_LOCK(&tinfo->lock); + + /* Limit in-flight queries */ + if (num_outstanding(stats) >= tinfo->max_outstanding) { + PERF_TIMEDWAIT(&tinfo->cond, &tinfo->lock, ×->stop_time_ns, NULL); + PERF_UNLOCK(&tinfo->lock); + now = perf_get_time(); + continue; + } + + q = perf_list_head(tinfo->unused_queries); + query_move(tinfo, q, prepend_outstanding); + q->timestamp = UINT64_MAX; + + i = tinfo->nsocks * 2; + while (i--) { + q->sock = &tinfo->socks[tinfo->current_sock++ % tinfo->nsocks]; + switch (perf_net_sockready(q->sock, threadpipe[0], TIMEOUT_CHECK_TIME)) { + case 0: + if (config->verbose) { + perf_log_warning("socket %p not ready", q->sock); + } + q->sock = 0; + continue; + case -1: + if (errno == EINPROGRESS) { + any_inprogress = 1; + q->sock = 0; + continue; + } + if (config->verbose) { + perf_log_warning("socket %p readiness check timed out", q->sock); + } + default: + break; + } + break; + }; + + if (!q->sock) { + query_move(tinfo, q, prepend_unused); + PERF_UNLOCK(&tinfo->lock); + now = perf_get_time(); + continue; + } + PERF_UNLOCK(&tinfo->lock); + + perf_buffer_clear(&lines); + result = perf_datafile_next(input, &lines, config->updates); + if (result != PERF_R_SUCCESS) { + if (result == PERF_R_INVALIDFILE) + perf_log_fatal("input file contains no data"); + break; + } + + qid = q - tinfo->queries; + perf_buffer_usedregion(&lines, &used); + perf_buffer_clear(&msg); + result = perf_dns_buildrequest(&used, qid, + config->edns, config->dnssec, config->updates, + config->tsigkey, config->edns_option, + &msg); + if (result != PERF_R_SUCCESS) { + PERF_LOCK(&tinfo->lock); + query_move(tinfo, q, prepend_unused); + PERF_UNLOCK(&tinfo->lock); + now = perf_get_time(); + continue; + } + + base = perf_buffer_base(&msg); + length = perf_buffer_usedlength(&msg); + + now = perf_get_time(); + if (config->verbose) { + free(q->desc); + q->desc = strdup(lines.base); + if (q->desc == NULL) + perf_log_fatal("out of memory"); + } + q->timestamp = now; + + n = perf_net_sendto(q->sock, base, length, 0, &config->server_addr.sa.sa, + config->server_addr.length); + if (n < 0) { + if (errno == EINPROGRESS) { + if (config->verbose) { + perf_log_warning("network congested, packet sending in progress"); + } + any_inprogress = 1; + } else { + if (config->verbose) { + char __s[256]; + perf_log_warning("failed to send packet: %s", perf_strerror_r(errno, __s, sizeof(__s))); + } + PERF_LOCK(&tinfo->lock); + query_move(tinfo, q, prepend_unused); + PERF_UNLOCK(&tinfo->lock); + continue; + } + } else if ((unsigned int)n != length) { + perf_log_warning("failed to send full packet: only sent %d of %u", n, length); + PERF_LOCK(&tinfo->lock); + query_move(tinfo, q, prepend_unused); + PERF_UNLOCK(&tinfo->lock); + continue; + } + stats->num_sent++; + + stats->total_request_size += length; + } + + while (any_inprogress) { + any_inprogress = 0; + for (i = 0; i < tinfo->nsocks; i++) { + if (perf_net_sockready(&tinfo->socks[i], threadpipe[0], TIMEOUT_CHECK_TIME) == -1 && errno == EINPROGRESS) { + any_inprogress = 1; + } + } + } + + tinfo->done_send_time = perf_get_time(); + tinfo->done_sending = true; + if (write(mainpipe[1], "", 1)) { // lgtm [cpp/empty-block] + } + return NULL; +} + +static void +process_timeouts(threadinfo_t* tinfo, uint64_t now) +{ + struct query_info* q; + const config_t* config; + + config = tinfo->config; + + /* Avoid locking unless we need to. */ + q = perf_list_tail(tinfo->outstanding_queries); + if (q == NULL || q->timestamp > now || now - q->timestamp < config->timeout) + return; + + PERF_LOCK(&tinfo->lock); + + do { + query_move(tinfo, q, append_unused); + + tinfo->stats.num_timedout++; + + if (q->desc != NULL) { + perf_log_printf("> T %s", q->desc); + } else { + perf_log_printf("[Timeout] %s timed out: msg id %u", + config->updates ? "Update" : "Query", + (unsigned int)(q - tinfo->queries)); + } + q = perf_list_tail(tinfo->outstanding_queries); + } while (q != NULL && q->timestamp < now && now - q->timestamp >= config->timeout); + + PERF_UNLOCK(&tinfo->lock); +} + +typedef struct { + struct perf_net_socket* sock; + uint16_t qid; + uint16_t rcode; + unsigned int size; + uint64_t when; + uint64_t sent; + bool unexpected; + bool short_response; + char* desc; +} received_query_t; + +static bool +recv_one(threadinfo_t* tinfo, int which_sock, + unsigned char* packet_buffer, unsigned int packet_size, + received_query_t* recvd, int* saved_errnop) +{ + uint16_t* packet_header; + uint64_t now; + int n; + + packet_header = (uint16_t*)packet_buffer; + + n = perf_net_recv(&tinfo->socks[which_sock], packet_buffer, packet_size, 0); + now = perf_get_time(); + if (n < 0) { + *saved_errnop = errno; + return false; + } + if (!n) { + // Treat connection closed like try again until reconnection features are in + *saved_errnop = EAGAIN; + return false; + } + recvd->sock = &tinfo->socks[which_sock]; + recvd->qid = ntohs(packet_header[0]); + recvd->rcode = ntohs(packet_header[1]) & 0xF; + recvd->size = n; + recvd->when = now; + recvd->sent = 0; + recvd->unexpected = false; + recvd->short_response = (n < 4); + recvd->desc = NULL; + return true; +} + +static inline void +bit_set(unsigned char* bits, unsigned int bit) +{ + unsigned int shift, mask; + + shift = 7 - (bit % 8); + mask = 1 << shift; + + bits[bit / 8] |= mask; +} + +static inline bool +bit_check(unsigned char* bits, unsigned int bit) +{ + unsigned int shift; + + shift = 7 - (bit % 8); + + if ((bits[bit / 8] >> shift) & 0x01) + return true; + return false; +} + +static void* +do_recv(void* arg) +{ + threadinfo_t* tinfo; + stats_t* stats; + unsigned char packet_buffer[MAX_EDNS_PACKET]; + received_query_t recvd[RECV_BATCH_SIZE] = { { 0, 0, 0, 0, 0, 0, false, false, 0 } }; + unsigned int nrecvd; + int saved_errno; + unsigned char socketbits[MAX_SOCKETS / 8]; + uint64_t now, latency; + query_info* q; + unsigned int current_socket, last_socket; + unsigned int i, j; + + tinfo = (threadinfo_t*)arg; + stats = &tinfo->stats; + + wait_for_start(); + now = perf_get_time(); + last_socket = 0; + while (!interrupted) { + process_timeouts(tinfo, now); + + /* + * If we're done sending and either all responses have been + * received, stop. + */ + if (tinfo->done_sending && num_outstanding(stats) == 0) + break; + + /* + * Try to receive a few packets, so that we can process them + * atomically. + */ + saved_errno = 0; + memset(socketbits, 0, sizeof(socketbits)); + for (i = 0; i < RECV_BATCH_SIZE; i++) { + for (j = 0; j < tinfo->nsocks; j++) { + current_socket = (j + last_socket) % tinfo->nsocks; + if (bit_check(socketbits, current_socket)) + continue; + if (recv_one(tinfo, current_socket, packet_buffer, + sizeof(packet_buffer), &recvd[i], &saved_errno)) { + last_socket = (current_socket + 1); + break; + } + bit_set(socketbits, current_socket); + if (saved_errno != EAGAIN) + break; + } + if (j == tinfo->nsocks) + break; + } + nrecvd = i; + + /* Do all of the processing that requires the lock */ + PERF_LOCK(&tinfo->lock); + for (i = 0; i < nrecvd; i++) { + if (recvd[i].short_response) + continue; + + q = &tinfo->queries[recvd[i].qid]; + if (q->list != &tinfo->outstanding_queries || q->timestamp == UINT64_MAX || !perf_net_sockeq(q->sock, recvd[i].sock)) { + recvd[i].unexpected = true; + continue; + } + query_move(tinfo, q, append_unused); + recvd[i].sent = q->timestamp; + recvd[i].desc = q->desc; + q->desc = NULL; + } + PERF_SIGNAL(&tinfo->cond); + PERF_UNLOCK(&tinfo->lock); + + /* Now do the rest of the processing unlocked */ + for (i = 0; i < nrecvd; i++) { + if (recvd[i].short_response) { + perf_log_warning("received short response"); + continue; + } + if (recvd[i].unexpected) { + perf_log_warning("received a response with an " + "unexpected (maybe timed out) " + "id: %u", + recvd[i].qid); + continue; + } + latency = recvd[i].when - recvd[i].sent; + if (recvd[i].desc != NULL) { + perf_log_printf( + "> %s %s %u.%06u", + perf_dns_rcode_strings[recvd[i].rcode], + recvd[i].desc, + (unsigned int)(latency / MILLION), + (unsigned int)(latency % MILLION)); + free(recvd[i].desc); + } + + stats->num_completed++; + stats->total_response_size += recvd[i].size; + stats->rcodecounts[recvd[i].rcode]++; + stats->latency_sum += latency; + stats->latency_sum_squares += (latency * latency); + if (latency < stats->latency_min || stats->num_completed == 1) + stats->latency_min = latency; + if (latency > stats->latency_max) + stats->latency_max = latency; + } + + if (nrecvd > 0) + tinfo->last_recv = recvd[nrecvd - 1].when; + + /* + * If there was an error, handle it (by either ignoring it, + * blocking, or exiting). + */ + if (nrecvd < RECV_BATCH_SIZE) { + if (saved_errno == EINTR) { + continue; + } else if (saved_errno == EAGAIN) { + perf_os_waituntilanyreadable(tinfo->socks, tinfo->nsocks, + threadpipe[0], TIMEOUT_CHECK_TIME); + now = perf_get_time(); + continue; + } else { + char __s[256]; + perf_log_fatal("failed to receive packet: %s", perf_strerror_r(saved_errno, __s, sizeof(__s))); + } + } + } + + return NULL; +} + +static void* +do_interval_stats(void* arg) +{ + threadinfo_t* tinfo; + stats_t total; + uint64_t now; + uint64_t last_interval_time; + uint64_t last_completed; + uint64_t interval_time; + uint64_t num_completed; + double qps; + struct perf_net_socket sock = { .mode = sock_pipe, .fd = threadpipe[0] }; + + tinfo = arg; + last_interval_time = tinfo->times->start_time; + last_completed = 0; + + wait_for_start(); + while (perf_os_waituntilreadable(&sock, threadpipe[0], + tinfo->config->stats_interval) + == PERF_R_TIMEDOUT) { + now = perf_get_time(); + sum_stats(tinfo->config, &total); + interval_time = now - last_interval_time; + num_completed = total.num_completed - last_completed; + qps = num_completed / (((double)interval_time) / MILLION); + perf_log_printf("%u.%06u: %.6lf", + (unsigned int)(now / MILLION), + (unsigned int)(now % MILLION), qps); + last_interval_time = now; + last_completed = total.num_completed; + } + + return NULL; +} + +static void +cancel_queries(threadinfo_t* tinfo) +{ + struct query_info* q; + + while (true) { + q = perf_list_tail(tinfo->outstanding_queries); + if (q == NULL) + break; + query_move(tinfo, q, append_unused); + + if (q->timestamp == UINT64_MAX) + continue; + + tinfo->stats.num_interrupted++; + if (q->desc != NULL) { + perf_log_printf("> I %s", q->desc); + free(q->desc); + q->desc = NULL; + } + } +} + +static uint32_t +per_thread(uint32_t total, uint32_t nthreads, unsigned int offset) +{ + uint32_t value, temp_total; + + value = total / nthreads; + + /* + * work out if there's a shortfall and adjust if necessary + */ + temp_total = value * nthreads; + if (temp_total < total && offset < total - temp_total) + value++; + + return value; +} + +static void +threadinfo_init(threadinfo_t* tinfo, const config_t* config, + const times_t* times) +{ + unsigned int offset, socket_offset, i; + + memset(tinfo, 0, sizeof(*tinfo)); + PERF_MUTEX_INIT(&tinfo->lock); + PERF_COND_INIT(&tinfo->cond); + + perf_list_init(tinfo->outstanding_queries); + perf_list_init(tinfo->unused_queries); + for (i = 0; i < NQIDS; i++) { + perf_link_init(&tinfo->queries[i]); + perf_list_append(tinfo->unused_queries, &tinfo->queries[i]); + tinfo->queries[i].list = &tinfo->unused_queries; + } + + offset = tinfo - threads; + + tinfo->config = config; + tinfo->times = times; + + /* + * Compute per-thread limits based on global values. + */ + tinfo->max_outstanding = per_thread(config->max_outstanding, + config->threads, offset); + tinfo->max_qps = per_thread(config->max_qps, config->threads, offset); + tinfo->nsocks = per_thread(config->clients, config->threads, offset); + + /* + * We can't have more than 64k outstanding queries per thread. + */ + if (tinfo->max_outstanding > NQIDS) + tinfo->max_outstanding = NQIDS; + + if (tinfo->nsocks > MAX_SOCKETS) + tinfo->nsocks = MAX_SOCKETS; + + if (!(tinfo->socks = calloc(tinfo->nsocks, sizeof(*tinfo->socks)))) { + perf_log_fatal("out of memory"); + } + socket_offset = 0; + for (i = 0; i < offset; i++) + socket_offset += threads[i].nsocks; + for (i = 0; i < tinfo->nsocks; i++) + tinfo->socks[i] = perf_net_opensocket(config->mode, &config->server_addr, + &config->local_addr, + socket_offset++, + config->bufsize); + tinfo->current_sock = 0; + + PERF_THREAD(&tinfo->receiver, do_recv, tinfo); + PERF_THREAD(&tinfo->sender, do_send, tinfo); +} + +static void +threadinfo_stop(threadinfo_t* tinfo) +{ + PERF_SIGNAL(&tinfo->cond); + PERF_JOIN(tinfo->sender, NULL); + PERF_JOIN(tinfo->receiver, NULL); +} + +static void +threadinfo_cleanup(threadinfo_t* tinfo, times_t* times) +{ + unsigned int i; + + if (interrupted) + cancel_queries(tinfo); + for (i = 0; i < tinfo->nsocks; i++) + perf_net_close(&tinfo->socks[i]); + if (tinfo->last_recv > times->end_time) + times->end_time = tinfo->last_recv; +} + +int main(int argc, char** argv) +{ + config_t config; + times_t times; + stats_t total_stats; + threadinfo_t stats_thread; + unsigned int i; + perf_result_t result; + struct perf_net_socket sock = { .mode = sock_pipe }; + + printf("DNS Performance Testing Tool\n" + "Version " PACKAGE_VERSION "\n\n"); + + (void)SSL_library_init(); +#if OPENSSL_VERSION_NUMBER < 0x10100000L + SSL_load_error_strings(); + OPENSSL_config(0); +#endif + + setup(argc, argv, &config); + + if (pipe(threadpipe) < 0 || pipe(mainpipe) < 0 || pipe(intrpipe) < 0) + perf_log_fatal("creating pipe"); + + perf_datafile_setpipefd(input, threadpipe[0]); + + perf_os_blocksignal(SIGINT, true); + switch (config.mode) { + case sock_tcp: + case sock_tls: + // block SIGPIPE for TCP/TLS mode, if connection is closed it will generate a signal + perf_os_blocksignal(SIGPIPE, true); + break; + default: + break; + } + + print_initial_status(&config); + + if (!(threads = calloc(config.threads, sizeof(threadinfo_t)))) { + perf_log_fatal("out of memory"); + } + for (i = 0; i < config.threads; i++) + threadinfo_init(&threads[i], &config, ×); + if (config.stats_interval > 0) { + stats_thread.config = &config; + stats_thread.times = × + PERF_THREAD(&stats_thread.sender, do_interval_stats, &stats_thread); + } + + times.start_time = perf_get_time(); + if (config.timelimit > 0) + times.stop_time = times.start_time + config.timelimit; + else + times.stop_time = UINT64_MAX; + times.stop_time_ns.tv_sec = times.stop_time / MILLION; + times.stop_time_ns.tv_nsec = (times.stop_time % MILLION) * 1000; + + PERF_LOCK(&start_lock); + started = true; + PERF_BROADCAST(&start_cond); + PERF_UNLOCK(&start_lock); + + perf_os_handlesignal(SIGINT, handle_sigint); + perf_os_blocksignal(SIGINT, false); + sock.fd = mainpipe[0]; + result = perf_os_waituntilreadable(&sock, intrpipe[0], + times.stop_time - times.start_time); + if (result == PERF_R_CANCELED) + interrupted = true; + + times.end_time = perf_get_time(); + + if (write(threadpipe[1], "", 1)) { // lgtm [cpp/empty-block] + } + for (i = 0; i < config.threads; i++) + threadinfo_stop(&threads[i]); + if (config.stats_interval > 0) + PERF_JOIN(stats_thread.sender, NULL); + + for (i = 0; i < config.threads; i++) + threadinfo_cleanup(&threads[i], ×); + + print_final_status(&config); + + sum_stats(&config, &total_stats); + print_statistics(&config, ×, &total_stats); + + cleanup(&config); +#if OPENSSL_VERSION_NUMBER < 0x10100000L + ERR_free_strings(); +#endif + + return (0); +} |