summaryrefslogtreecommitdiffstats
path: root/src/net_tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_tcp.c')
-rw-r--r--src/net_tcp.c127
1 files changed, 90 insertions, 37 deletions
diff --git a/src/net_tcp.c b/src/net_tcp.c
index 94d2a6a..c04f25d 100644
--- a/src/net_tcp.c
+++ b/src/net_tcp.c
@@ -36,6 +36,33 @@
#define self ((struct perf__tcp_socket*)sock)
+/*
+
+About state sync between sending and receiving thread
+
+Two variables in the TCP socket struct are used with libck to atomically
+sync states between thread w.r.t connect/reconnect events.
+
+ sock->fd is controlled by the sending thread (ST)
+ self->recv_need_reconn is controlled by receiving thread (RT)
+
+On connect/reconnect ST will open a new socket and atomically store it
+into sock->fd.
+
+When RT is trying to receive it will atomically load sock->fd and store it
+in self->recvfd. Before storing it, it will compare it to what already in
+self->recvfd and if it differ then a connect/reconnect event happend and RT
+will reset receiving state and buffers.
+
+If RT detects a disconnection it will atomically store self->recvfd into
+self->recv_need_reconn to signal to ST that it needs to reconnect.
+
+ST will load and check self->recv_need_reconn before sending and when
+checking socket readiness, if its the same as sock->fd then it will start
+reconnecting.
+
+*/
+
struct perf__tcp_socket {
struct perf_net_socket base;
@@ -43,19 +70,16 @@ struct perf__tcp_socket {
size_t at, sending;
bool is_ready, need_reconnect, have_more, is_sending;
- int flags;
- struct sockaddr_storage dest_addr;
- socklen_t addrlen;
-
perf_sockaddr_t server, local;
size_t bufsize;
int recvfd;
+ int recv_need_reconn;
uint16_t qid;
uint64_t conn_ts;
- perf_socket_event_t conn_event;
+ perf_socket_event_t conn_event, conning_event;
};
static int perf__tcp_connect(struct perf_net_socket* sock)
@@ -103,6 +127,10 @@ static int perf__tcp_connect(struct perf_net_socket* sock)
perf_log_fatal("fcntl(F_SETFL)");
self->conn_ts = perf_get_time();
+ if (sock->event) {
+ sock->event(sock, self->conning_event, self->conn_ts);
+ self->conning_event = perf_socket_event_reconnecting;
+ }
if (connect(fd, &self->server.sa.sa, self->server.length)) {
if (errno == EINPROGRESS) {
self->is_ready = false;
@@ -131,12 +159,16 @@ static ssize_t perf__tcp_recv(struct perf_net_socket* sock, void* buf, size_t le
if (!self->have_more) {
n = recv(fd, self->recvbuf + self->at, TCP_RECV_BUF_SIZE - self->at, flags);
if (!n) {
+ // need reconnect
+ ck_pr_store_int(&self->recv_need_reconn, fd);
return 0;
} else if (n < 0) {
switch (errno) {
case ECONNREFUSED:
case ECONNRESET:
case ENOTCONN:
+ // need reconnect
+ ck_pr_store_int(&self->recv_need_reconn, fd);
errno = EAGAIN;
break;
default:
@@ -184,7 +216,17 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons
memcpy(self->sendbuf, &dnslen, 2);
memcpy(self->sendbuf + 2, buf, send);
self->qid = qid;
- n = sendto(sock->fd, self->sendbuf, send + 2, flags, dest_addr, addrlen);
+
+ int recv_need_reconn = ck_pr_load_int(&self->recv_need_reconn);
+ if (recv_need_reconn == sock->fd) {
+ self->need_reconnect = true;
+ self->is_sending = true;
+ self->sending = 0;
+ errno = EINPROGRESS;
+ return -1;
+ }
+
+ n = sendto(sock->fd, self->sendbuf, send + 2, 0, 0, 0);
if (n < 0) {
switch (errno) {
@@ -206,10 +248,7 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons
if (n < send + 2) {
self->is_sending = true;
self->sending = n;
- self->flags = flags;
- memcpy(&self->dest_addr, dest_addr, addrlen);
- self->addrlen = addrlen;
- errno = EINPROGRESS;
+ errno = EINPROGRESS;
return -1;
}
@@ -228,7 +267,8 @@ static int perf__tcp_sockeq(struct perf_net_socket* sock_a, struct perf_net_sock
static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_t timeout)
{
- if (self->need_reconnect) {
+ int recv_need_reconn = ck_pr_load_int(&self->recv_need_reconn);
+ if (recv_need_reconn == sock->fd || self->need_reconnect) {
int fd = perf__tcp_connect(sock), oldfd = ck_pr_load_int(&sock->fd);
ck_pr_store_int(&sock->fd, fd);
close(oldfd);
@@ -242,29 +282,20 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
memcpy(&dnslen, self->sendbuf, 2);
dnslen = ntohs(dnslen);
- n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, self->flags, (struct sockaddr*)&self->dest_addr, self->addrlen);
- if (n < 1) {
- switch (errno) {
- case ECONNREFUSED:
- case ECONNRESET:
- case ENOTCONN:
- case EPIPE:
- self->need_reconnect = true;
- if (self->sending) {
- self->sending = 0;
- self->is_sending = false;
- }
- errno = EINPROGRESS;
- return -1;
- default:
- break;
+ n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0);
+ if (n < 0) {
+ int fd = perf__tcp_connect(sock), oldfd = ck_pr_load_int(&sock->fd);
+ ck_pr_store_int(&sock->fd, fd);
+ close(oldfd);
+ if (self->sending) {
+ self->sending = 0;
+ self->is_sending = false;
}
- return -1;
+ goto conn_cont;
}
self->sending += n;
if (self->sending < dnslen + 2) {
- errno = EINPROGRESS;
- return -1;
+ return 0;
}
self->sending = 0;
self->is_sending = false;
@@ -275,6 +306,7 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
return 1;
}
+conn_cont:
switch (perf_os_waituntilanywritable(&sock, 1, pipe_fd, timeout)) {
case PERF_R_TIMEDOUT:
return -1;
@@ -291,16 +323,35 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
|| error == EAGAIN) {
return 0;
}
- return -1;
+ // unrecoverable error, reconnect
+ self->need_reconnect = true;
+ return 0;
}
self->is_ready = true;
if (sock->event) {
sock->event(sock, self->conn_event, perf_get_time() - self->conn_ts);
- self->conn_event = perf_socket_event_reconnect;
+ self->conn_event = perf_socket_event_reconnected;
}
if (self->is_sending) {
- errno = EINPROGRESS;
- return -1;
+ uint16_t dnslen;
+ ssize_t n;
+
+ memcpy(&dnslen, self->sendbuf, 2);
+ dnslen = ntohs(dnslen);
+ n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0);
+ if (n < 0) {
+ self->need_reconnect = true;
+ return 0;
+ }
+ self->sending += n;
+ if (self->sending < dnslen + 2) {
+ return 0;
+ }
+ self->sending = 0;
+ self->is_sending = false;
+ if (sock->sent) {
+ sock->sent(sock, self->qid);
+ }
}
return 1;
}
@@ -339,10 +390,12 @@ struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, c
if (self->bufsize > 0) {
self->bufsize *= 1024;
}
- self->conn_event = perf_socket_event_connect;
+ self->conning_event = perf_socket_event_connecting;
+ self->conn_event = perf_socket_event_connected;
- sock->fd = perf__tcp_connect(sock);
- self->recvfd = sock->fd;
+ sock->fd = perf__tcp_connect(sock);
+ self->recvfd = sock->fd;
+ self->recv_need_reconn = -1;
return sock;
}