diff options
Diffstat (limited to 'src/net_tcp.c')
-rw-r--r-- | src/net_tcp.c | 33 |
1 files changed, 32 insertions, 1 deletions
diff --git a/src/net_tcp.c b/src/net_tcp.c index 97d1f65..5473e4a 100644 --- a/src/net_tcp.c +++ b/src/net_tcp.c @@ -80,13 +80,20 @@ struct perf__tcp_socket { uint64_t conn_ts; perf_socket_event_t conn_event, conning_event; + + size_t num_queries_per_conn, nqpc_timeout; + unsigned int nqpc_sent, nqpc_recv; + uint64_t nqpc_ts; }; static int perf__tcp_connect(struct perf_net_socket* sock) { int fd; - self->is_ready = true; + self->is_ready = true; + self->nqpc_sent = 0; + ck_pr_store_uint(&self->nqpc_recv, 0); + self->nqpc_ts = 0; fd = socket(self->server.sa.sa.sa_family, SOCK_STREAM, 0); if (fd == -1) { @@ -192,6 +199,9 @@ static ssize_t perf__tcp_recv(struct perf_net_socket* sock, void* buf, size_t le memcpy(buf, self->recvbuf + 2, len < dnslen ? len : dnslen); memmove(self->recvbuf, self->recvbuf + 2 + dnslen, self->at - 2 - dnslen); self->at -= 2 + dnslen; + if (self->num_queries_per_conn) { + ck_pr_inc_uint(&self->nqpc_recv); + } if (self->at > 2) { memcpy(&dnslen2, self->recvbuf, 2); @@ -251,6 +261,7 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons errno = EINPROGRESS; return -1; } + self->nqpc_sent++; return n - 2; } @@ -302,6 +313,17 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_ if (sock->sent) { sock->sent(sock, self->qid); } + self->nqpc_sent++; + } + if (self->num_queries_per_conn && self->nqpc_sent >= self->num_queries_per_conn) { + if (!self->nqpc_ts) { + self->nqpc_ts = perf_get_time() + self->nqpc_timeout; + } + unsigned int r = ck_pr_load_uint(&self->nqpc_recv); + if (r >= self->nqpc_sent || perf_get_time() > self->nqpc_ts) { + self->need_reconnect = true; + } + return 0; } return 1; } @@ -352,6 +374,7 @@ conn_cont: if (sock->sent) { sock->sent(sock, self->qid); } + self->nqpc_sent++; } return 1; } @@ -367,6 +390,12 @@ static bool perf__tcp_have_more(struct perf_net_socket* sock) return self->have_more; } +static void perf__tcp_num_queries_per_conn(struct perf_net_socket* sock, size_t num_queries_per_conn, size_t timeout) +{ + self->num_queries_per_conn = num_queries_per_conn; + self->nqpc_timeout = timeout; +} + struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, const perf_sockaddr_t* local, size_t bufsize, void* data, perf_net_sent_cb_t sent, perf_net_event_cb_t event) { struct perf__tcp_socket* tmp = calloc(1, sizeof(struct perf__tcp_socket)); // clang scan-build @@ -384,6 +413,8 @@ struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, c sock->sockready = perf__tcp_sockready; sock->have_more = perf__tcp_have_more; + sock->num_queries_per_conn = perf__tcp_num_queries_per_conn; + sock->data = data; sock->sent = sent; sock->event = event; |