From 0b78d239dfb4e80d1b213e7a925ae1fa0b5340be Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 31 Mar 2021 09:37:49 +0200 Subject: Merging upstream version 2.5.2. Signed-off-by: Daniel Baumann --- src/net_tcp.c | 127 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 90 insertions(+), 37 deletions(-) (limited to 'src/net_tcp.c') diff --git a/src/net_tcp.c b/src/net_tcp.c index 94d2a6a..c04f25d 100644 --- a/src/net_tcp.c +++ b/src/net_tcp.c @@ -36,6 +36,33 @@ #define self ((struct perf__tcp_socket*)sock) +/* + +About state sync between sending and receiving thread + +Two variables in the TCP socket struct are used with libck to atomically +sync states between thread w.r.t connect/reconnect events. + + sock->fd is controlled by the sending thread (ST) + self->recv_need_reconn is controlled by receiving thread (RT) + +On connect/reconnect ST will open a new socket and atomically store it +into sock->fd. + +When RT is trying to receive it will atomically load sock->fd and store it +in self->recvfd. Before storing it, it will compare it to what already in +self->recvfd and if it differ then a connect/reconnect event happend and RT +will reset receiving state and buffers. + +If RT detects a disconnection it will atomically store self->recvfd into +self->recv_need_reconn to signal to ST that it needs to reconnect. + +ST will load and check self->recv_need_reconn before sending and when +checking socket readiness, if its the same as sock->fd then it will start +reconnecting. + +*/ + struct perf__tcp_socket { struct perf_net_socket base; @@ -43,19 +70,16 @@ struct perf__tcp_socket { size_t at, sending; bool is_ready, need_reconnect, have_more, is_sending; - int flags; - struct sockaddr_storage dest_addr; - socklen_t addrlen; - perf_sockaddr_t server, local; size_t bufsize; int recvfd; + int recv_need_reconn; uint16_t qid; uint64_t conn_ts; - perf_socket_event_t conn_event; + perf_socket_event_t conn_event, conning_event; }; static int perf__tcp_connect(struct perf_net_socket* sock) @@ -103,6 +127,10 @@ static int perf__tcp_connect(struct perf_net_socket* sock) perf_log_fatal("fcntl(F_SETFL)"); self->conn_ts = perf_get_time(); + if (sock->event) { + sock->event(sock, self->conning_event, self->conn_ts); + self->conning_event = perf_socket_event_reconnecting; + } if (connect(fd, &self->server.sa.sa, self->server.length)) { if (errno == EINPROGRESS) { self->is_ready = false; @@ -131,12 +159,16 @@ static ssize_t perf__tcp_recv(struct perf_net_socket* sock, void* buf, size_t le if (!self->have_more) { n = recv(fd, self->recvbuf + self->at, TCP_RECV_BUF_SIZE - self->at, flags); if (!n) { + // need reconnect + ck_pr_store_int(&self->recv_need_reconn, fd); return 0; } else if (n < 0) { switch (errno) { case ECONNREFUSED: case ECONNRESET: case ENOTCONN: + // need reconnect + ck_pr_store_int(&self->recv_need_reconn, fd); errno = EAGAIN; break; default: @@ -184,7 +216,17 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons memcpy(self->sendbuf, &dnslen, 2); memcpy(self->sendbuf + 2, buf, send); self->qid = qid; - n = sendto(sock->fd, self->sendbuf, send + 2, flags, dest_addr, addrlen); + + int recv_need_reconn = ck_pr_load_int(&self->recv_need_reconn); + if (recv_need_reconn == sock->fd) { + self->need_reconnect = true; + self->is_sending = true; + self->sending = 0; + errno = EINPROGRESS; + return -1; + } + + n = sendto(sock->fd, self->sendbuf, send + 2, 0, 0, 0); if (n < 0) { switch (errno) { @@ -206,10 +248,7 @@ static ssize_t perf__tcp_sendto(struct perf_net_socket* sock, uint16_t qid, cons if (n < send + 2) { self->is_sending = true; self->sending = n; - self->flags = flags; - memcpy(&self->dest_addr, dest_addr, addrlen); - self->addrlen = addrlen; - errno = EINPROGRESS; + errno = EINPROGRESS; return -1; } @@ -228,7 +267,8 @@ static int perf__tcp_sockeq(struct perf_net_socket* sock_a, struct perf_net_sock static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_t timeout) { - if (self->need_reconnect) { + int recv_need_reconn = ck_pr_load_int(&self->recv_need_reconn); + if (recv_need_reconn == sock->fd || self->need_reconnect) { int fd = perf__tcp_connect(sock), oldfd = ck_pr_load_int(&sock->fd); ck_pr_store_int(&sock->fd, fd); close(oldfd); @@ -242,29 +282,20 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_ memcpy(&dnslen, self->sendbuf, 2); dnslen = ntohs(dnslen); - n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, self->flags, (struct sockaddr*)&self->dest_addr, self->addrlen); - if (n < 1) { - switch (errno) { - case ECONNREFUSED: - case ECONNRESET: - case ENOTCONN: - case EPIPE: - self->need_reconnect = true; - if (self->sending) { - self->sending = 0; - self->is_sending = false; - } - errno = EINPROGRESS; - return -1; - default: - break; + n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0); + if (n < 0) { + int fd = perf__tcp_connect(sock), oldfd = ck_pr_load_int(&sock->fd); + ck_pr_store_int(&sock->fd, fd); + close(oldfd); + if (self->sending) { + self->sending = 0; + self->is_sending = false; } - return -1; + goto conn_cont; } self->sending += n; if (self->sending < dnslen + 2) { - errno = EINPROGRESS; - return -1; + return 0; } self->sending = 0; self->is_sending = false; @@ -275,6 +306,7 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_ return 1; } +conn_cont: switch (perf_os_waituntilanywritable(&sock, 1, pipe_fd, timeout)) { case PERF_R_TIMEDOUT: return -1; @@ -291,16 +323,35 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_ || error == EAGAIN) { return 0; } - return -1; + // unrecoverable error, reconnect + self->need_reconnect = true; + return 0; } self->is_ready = true; if (sock->event) { sock->event(sock, self->conn_event, perf_get_time() - self->conn_ts); - self->conn_event = perf_socket_event_reconnect; + self->conn_event = perf_socket_event_reconnected; } if (self->is_sending) { - errno = EINPROGRESS; - return -1; + uint16_t dnslen; + ssize_t n; + + memcpy(&dnslen, self->sendbuf, 2); + dnslen = ntohs(dnslen); + n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0); + if (n < 0) { + self->need_reconnect = true; + return 0; + } + self->sending += n; + if (self->sending < dnslen + 2) { + return 0; + } + self->sending = 0; + self->is_sending = false; + if (sock->sent) { + sock->sent(sock, self->qid); + } } return 1; } @@ -339,10 +390,12 @@ struct perf_net_socket* perf_net_tcp_opensocket(const perf_sockaddr_t* server, c if (self->bufsize > 0) { self->bufsize *= 1024; } - self->conn_event = perf_socket_event_connect; + self->conning_event = perf_socket_event_connecting; + self->conn_event = perf_socket_event_connected; - sock->fd = perf__tcp_connect(sock); - self->recvfd = sock->fd; + sock->fd = perf__tcp_connect(sock); + self->recvfd = sock->fd; + self->recv_need_reconn = -1; return sock; } -- cgit v1.2.3