diff options
Diffstat (limited to 'libnetdata/socket')
-rw-r--r-- | libnetdata/socket/socket.c | 384 |
1 files changed, 231 insertions, 153 deletions
diff --git a/libnetdata/socket/socket.c b/libnetdata/socket/socket.c index 73eb8e662..df6d3148b 100644 --- a/libnetdata/socket/socket.c +++ b/libnetdata/socket/socket.c @@ -1386,175 +1386,142 @@ static void poll_events_cleanup(void *data) { freez(p->inf); } -static void poll_events_process(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, short int revents, time_t now) { - short int events = pf->events; - int fd = pf->fd; - pf->revents = 0; - size_t i = pi->slot; +static int poll_process_error(POLLINFO *pi, struct pollfd *pf, short int revents) { + error("POLLFD: LISTENER: received %s %s %s on socket at slot %zu (fd %d) client '%s' port '%s' expecting %s %s %s, having %s %s %s" + , revents & POLLERR ? "POLLERR" : "" + , revents & POLLHUP ? "POLLHUP" : "" + , revents & POLLNVAL ? "POLLNVAL" : "" + , pi->slot + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pf->events & POLLIN ? "POLLIN" : "", pf->events & POLLOUT ? "POLLOUT" : "", pf->events & POLLPRI ? "POLLPRI" : "" + , revents & POLLIN ? "POLLIN" : "", revents & POLLOUT ? "POLLOUT" : "", revents & POLLPRI ? "POLLPRI" : "" + ); - if(unlikely(fd == -1)) { - debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i); - return; - } + pf->events = 0; + poll_close_fd(pi); + return 1; +} - debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, events, revents); +static inline int poll_process_send(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_sent_t = now; + pi->send_count++; - if(revents & POLLIN || revents & POLLPRI) { - // receiving data + debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", pi->slot, pf->fd); - pi->last_received_t = now; - pi->recv_count++; + pf->events = 0; - if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) { - // read data from client TCP socket - debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd); + // remember the slot, in case we need to close it later + // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call + size_t slot = pi->slot; - pf->events = 0; - if (pi->rcv_callback(pi, &pf->events) == -1) { - poll_close_fd(&p->inf[i]); - return; - } - pf = &p->fds[i]; - pi = &p->inf[i]; - -#ifdef NETDATA_INTERNAL_CHECKS - // this is common - it is used for web server file copies - if(unlikely(!(pf->events & (POLLIN|POLLOUT)))) { - error("POLLFD: LISTENER: after reading, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>"); - //poll_close_fd(pi); - //return; - } -#endif - } - else if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) { - // new connection - // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd); - - switch(pi->socktype) { - case SOCK_STREAM: { - // a TCP socket - // we accept the connection - - int nfd; - do { - char client_ip[INET6_ADDRSTRLEN]; - char client_port[NI_MAXSERV]; - char client_host[NI_MAXHOST]; - client_host[0] = 0; - client_ip[0] = 0; - client_port[0] = 0; - - debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd); - nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, INET6_ADDRSTRLEN, client_port, NI_MAXSERV, - client_host, NI_MAXHOST, p->access_list, p->allow_dns); - if (unlikely(nfd < 0)) { - // accept failed - - debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd); - - if(unlikely(errno == EMFILE)) { - error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit); - usleep(1000); // 10ms - } - else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN)) - error("POLLFD: LISTENER: accept() failed."); - - break; - } - else { - // accept ok - // info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s' on fd %d", client_ip, client_port, sockets->fds_names[i], nfd); - poll_add_fd(p - , nfd - , SOCK_STREAM - , pi->port_acl - , POLLINFO_FLAG_CLIENT_SOCKET - , client_ip - , client_port - , client_host - , p->add_callback - , p->del_callback - , p->rcv_callback - , p->snd_callback - , NULL - ); + if (unlikely(pi->snd_callback(pi, &pf->events) == -1)) + poll_close_fd(&p->inf[slot]); - // it may have reallocated them, so refresh our pointers - pf = &p->fds[i]; - pi = &p->inf[i]; - } - } while (nfd >= 0 && (!p->limit || p->used < p->limit)); - break; - } + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - case SOCK_DGRAM: { - // a UDP socket - // we read data from the server socket + return 1; +} - debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd); +static inline int poll_process_tcp_read(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_received_t = now; + pi->recv_count++; - // TODO: access_list is not applied to UDP - // but checking the access list on every UDP packet will destroy - // performance, especially for statsd. + debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", pi->slot, pf->fd); - pf->events = 0; - pi->rcv_callback(pi, &pf->events); - break; - } + pf->events = 0; - default: { - error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot); - break; - } - } - } - } + // remember the slot, in case we need to close it later + // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call + size_t slot = pi->slot; - if(unlikely(revents & POLLOUT)) { - // sending data - debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd); + if (pi->rcv_callback(pi, &pf->events) == -1) + poll_close_fd(&p->inf[slot]); - pi->last_sent_t = now; - pi->send_count++; + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - pf->events = 0; - if (pi->snd_callback(pi, &pf->events) == -1) { - poll_close_fd(&p->inf[i]); - return; - } - pf = &p->fds[i]; - pi = &p->inf[i]; - -#ifdef NETDATA_INTERNAL_CHECKS - // this is common - it is used for streaming - if(unlikely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET && !(pf->events & (POLLIN|POLLOUT)))) { - error("POLLFD: LISTENER: after sending, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>"); - //poll_close_fd(pi); - //return; + return 1; +} + +static inline int poll_process_udp_read(POLLINFO *pi, struct pollfd *pf, time_t now __maybe_unused) { + pi->last_received_t = now; + pi->recv_count++; + + debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", pi->slot, pf->fd); + + // TODO: access_list is not applied to UDP + // but checking the access list on every UDP packet will destroy + // performance, especially for statsd. + + pf->events = 0; + if(pi->rcv_callback(pi, &pf->events) == -1) + return 0; + + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. + + return 1; +} + +static int poll_process_new_tcp_connection(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_received_t = now; + pi->recv_count++; + + debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", pi->slot, pf->fd); + + char client_ip[INET6_ADDRSTRLEN] = ""; + char client_port[NI_MAXSERV] = ""; + char client_host[NI_MAXHOST] = ""; + + debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", pi->slot, pf->fd); + + int nfd = accept_socket( + pf->fd,SOCK_NONBLOCK, + client_ip, INET6_ADDRSTRLEN, client_port,NI_MAXSERV, client_host, NI_MAXHOST, + p->access_list, p->allow_dns + ); + + if (unlikely(nfd < 0)) { + // accept failed + + debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", pi->slot, pf->fd); + + if(unlikely(errno == EMFILE)) { + error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit); + usleep(1000); // 1ms } -#endif - } + else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN)) + error("POLLFD: LISTENER: accept() failed."); - if(unlikely(revents & POLLERR)) { - error("POLLFD: LISTENER: processing POLLERR events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; } + else { + // accept ok + + poll_add_fd(p + , nfd + , SOCK_STREAM + , pi->port_acl + , POLLINFO_FLAG_CLIENT_SOCKET + , client_ip + , client_port + , client_host + , p->add_callback + , p->del_callback + , p->rcv_callback + , p->snd_callback + , NULL + ); - if(unlikely(revents & POLLHUP)) { - error("POLLFD: LISTENER: processing POLLHUP events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; - } + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - if(unlikely(revents & POLLNVAL)) { - error("POLLFD: LISTENER: processing POLLNVAL events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; + return 1; } + + return 0; } void poll_events(LISTEN_SOCKETS *sockets @@ -1687,18 +1654,129 @@ void poll_events(LISTEN_SOCKETS *sockets debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout."); } else { + POLLINFO *pi; + struct pollfd *pf; + size_t idx, processed = 0; + short int revents; + + // keep fast lookup arrays per function + // to avoid looping through the entire list every time + size_t sends[p.max + 1], sends_max = 0; + size_t reads[p.max + 1], reads_max = 0; + size_t conns[p.max + 1], conns_max = 0; + size_t udprd[p.max + 1], udprd_max = 0; + for (i = 0; i <= p.max; i++) { - struct pollfd *pf = &p.fds[i]; - short int revents = pf->revents; - if (unlikely(revents)) - poll_events_process(&p, &p.inf[i], pf, revents, now); + pi = &p.inf[i]; + pf = &p.fds[i]; + revents = pf->revents; + + if(unlikely(revents == 0 || pf->fd == -1)) + continue; + + if (unlikely(revents & (POLLERR|POLLHUP|POLLNVAL))) { + // something is wrong to one of our sockets + + pf->revents = 0; + processed += poll_process_error(pi, pf, revents); + } + else if (likely(revents & POLLOUT)) { + // a client is ready to receive data + + sends[sends_max++] = i; + } + else if (likely(revents & (POLLIN|POLLPRI))) { + if (pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) { + // a client sent data to us + + reads[reads_max++] = i; + } + else if (pi->flags & POLLINFO_FLAG_SERVER_SOCKET) { + // something is coming to our server sockets + + if(pi->socktype == SOCK_DGRAM) { + // UDP receive, directly on our listening socket + + udprd[udprd_max++] = i; + } + else if(pi->socktype == SOCK_STREAM) { + // new TCP connection + + conns[conns_max++] = i; + } + else + error("POLLFD: LISTENER: server slot %zu (fd %d) connection from %s port %s using unhandled socket type %d." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pi->socktype + ); + } + else + error("POLLFD: LISTENER: client slot %zu (fd %d) data from %s port %s using flags %08X is neither client nor server." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pi->flags + ); + } + else + error("POLLFD: LISTENER: socket slot %zu (fd %d) client %s port %s unhandled event id %d." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , revents + ); + } + + // process sends + for (idx = 0; idx < sends_max; idx++) { + i = sends[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_send(&p, pi, pf, now); + } + + // process UDP reads + for (idx = 0; idx < udprd_max; idx++) { + i = udprd[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_udp_read(pi, pf, now); + } + + // process TCP reads + for (idx = 0; idx < reads_max; idx++) { + i = reads[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_tcp_read(&p, pi, pf, now); + } + + if(!processed && (!p.limit || p.used < p.limit)) { + // nothing processed above (rcv, snd) and we have room for another TCP connection + // so, accept one TCP connection + for (idx = 0; idx < conns_max; idx++) { + i = conns[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + if (poll_process_new_tcp_connection(&p, pi, pf, now)) + break; + } } } if(unlikely(p.checks_every > 0 && now - last_check > p.checks_every)) { last_check = now; - // security checks + // cleanup old sockets for(i = 0; i <= p.max; i++) { POLLINFO *pi = &p.inf[i]; |