summaryrefslogtreecommitdiffstats
path: root/src/net_tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_tcp.c')
-rw-r--r--src/net_tcp.c33
1 files changed, 32 insertions, 1 deletions
diff --git a/src/net_tcp.c b/src/net_tcp.c
index 97d1f65..5473e4a 100644
--- a/src/net_tcp.c
+++ b/src/net_tcp.c
@@ -80,13 +80,20 @@ struct perf__tcp_socket {
uint64_t conn_ts;
perf_socket_event_t conn_event, conning_event;
+
+ size_t num_queries_per_conn, nqpc_timeout;
+ unsigned int nqpc_sent, nqpc_recv;
+ uint64_t nqpc_ts;
};
static int perf__tcp_connect(struct perf_net_socket* sock)
{
int fd;
- self->is_ready = true;
+ self->is_ready = true;
+ self->nqpc_sent = 0;
+ ck_pr_store_uint(&self->nqpc_recv, 0);
+ self->nqpc_ts = 0;
fd = socket(self->server.sa.sa.sa_family, SOCK_STREAM, 0);
if (fd == -1) {
@@ -192,6 +199,9 @@ static ssize_t perf__tcp_recv(struct perf_net_socket* sock, void* buf, size_t le
memcpy(buf, self->recvbuf + 2, len < dnslen ? len : dnslen);
memmove(self->recvbuf, self->recvbuf + 2 + dnslen, self->at - 2 - dnslen);
self->at -= 2 + dnslen;
+ if (self->num_queries_per_conn) {
+ ck_pr_inc_uint(&self->nqpc_recv);
+ }
if (self->at > 2) {
memcpy(&dnslen2, self->recvbuf, 2);
@@ -251,6 +261,7 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons
errno = EINPROGRESS;
return -1;
}
+ self->nqpc_sent++;
return n - 2;
}
@@ -302,6 +313,17 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
if (sock->sent) {
sock->sent(sock, self->qid);
}
+ self->nqpc_sent++;
+ }
+ if (self->num_queries_per_conn && self->nqpc_sent >= self->num_queries_per_conn) {
+ if (!self->nqpc_ts) {
+ self->nqpc_ts = perf_get_time() + self->nqpc_timeout;
+ }
+ unsigned int r = ck_pr_load_uint(&self->nqpc_recv);
+ if (r >= self->nqpc_sent || perf_get_time() > self->nqpc_ts) {
+ self->need_reconnect = true;
+ }
+ return 0;
}
return 1;
}
@@ -352,6 +374,7 @@ conn_cont:
if (sock->sent) {
sock->sent(sock, self->qid);
}
+ self->nqpc_sent++;
}
return 1;
}
@@ -367,6 +390,12 @@ static bool perf__tcp_have_more(struct perf_net_socket* sock)
return self->have_more;
}
+static void perf__tcp_num_queries_per_conn(struct perf_net_socket* sock, size_t num_queries_per_conn, size_t timeout)
+{
+ self->num_queries_per_conn = num_queries_per_conn;
+ self->nqpc_timeout = timeout;
+}
+
struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, const perf_sockaddr_t* local, size_t bufsize, void* data, perf_net_sent_cb_t sent, perf_net_event_cb_t event)
{
struct perf__tcp_socket* tmp = calloc(1, sizeof(struct perf__tcp_socket)); // clang scan-build
@@ -384,6 +413,8 @@ struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, c
sock->sockready = perf__tcp_sockready;
sock->have_more = perf__tcp_have_more;
+ sock->num_queries_per_conn = perf__tcp_num_queries_per_conn;
+
sock->data = data;
sock->sent = sent;
sock->event = event;