summaryrefslogtreecommitdiffstats
path: root/libnetdata/socket/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'libnetdata/socket/socket.c')
-rw-r--r--libnetdata/socket/socket.c384
1 files changed, 231 insertions, 153 deletions
diff --git a/libnetdata/socket/socket.c b/libnetdata/socket/socket.c
index 73eb8e662..df6d3148b 100644
--- a/libnetdata/socket/socket.c
+++ b/libnetdata/socket/socket.c
@@ -1386,175 +1386,142 @@ static void poll_events_cleanup(void *data) {
freez(p->inf);
}
-static void poll_events_process(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, short int revents, time_t now) {
- short int events = pf->events;
- int fd = pf->fd;
- pf->revents = 0;
- size_t i = pi->slot;
+static int poll_process_error(POLLINFO *pi, struct pollfd *pf, short int revents) {
+ error("POLLFD: LISTENER: received %s %s %s on socket at slot %zu (fd %d) client '%s' port '%s' expecting %s %s %s, having %s %s %s"
+ , revents & POLLERR ? "POLLERR" : ""
+ , revents & POLLHUP ? "POLLHUP" : ""
+ , revents & POLLNVAL ? "POLLNVAL" : ""
+ , pi->slot
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , pf->events & POLLIN ? "POLLIN" : "", pf->events & POLLOUT ? "POLLOUT" : "", pf->events & POLLPRI ? "POLLPRI" : ""
+ , revents & POLLIN ? "POLLIN" : "", revents & POLLOUT ? "POLLOUT" : "", revents & POLLPRI ? "POLLPRI" : ""
+ );
- if(unlikely(fd == -1)) {
- debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i);
- return;
- }
+ pf->events = 0;
+ poll_close_fd(pi);
+ return 1;
+}
- debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, events, revents);
+static inline int poll_process_send(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
+ pi->last_sent_t = now;
+ pi->send_count++;
- if(revents & POLLIN || revents & POLLPRI) {
- // receiving data
+ debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", pi->slot, pf->fd);
- pi->last_received_t = now;
- pi->recv_count++;
+ pf->events = 0;
- if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
- // read data from client TCP socket
- debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd);
+ // remember the slot, in case we need to close it later
+ // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call
+ size_t slot = pi->slot;
- pf->events = 0;
- if (pi->rcv_callback(pi, &pf->events) == -1) {
- poll_close_fd(&p->inf[i]);
- return;
- }
- pf = &p->fds[i];
- pi = &p->inf[i];
-
-#ifdef NETDATA_INTERNAL_CHECKS
- // this is common - it is used for web server file copies
- if(unlikely(!(pf->events & (POLLIN|POLLOUT)))) {
- error("POLLFD: LISTENER: after reading, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
- //poll_close_fd(pi);
- //return;
- }
-#endif
- }
- else if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) {
- // new connection
- // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd);
-
- switch(pi->socktype) {
- case SOCK_STREAM: {
- // a TCP socket
- // we accept the connection
-
- int nfd;
- do {
- char client_ip[INET6_ADDRSTRLEN];
- char client_port[NI_MAXSERV];
- char client_host[NI_MAXHOST];
- client_host[0] = 0;
- client_ip[0] = 0;
- client_port[0] = 0;
-
- debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd);
- nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, INET6_ADDRSTRLEN, client_port, NI_MAXSERV,
- client_host, NI_MAXHOST, p->access_list, p->allow_dns);
- if (unlikely(nfd < 0)) {
- // accept failed
-
- debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd);
-
- if(unlikely(errno == EMFILE)) {
- error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit);
- usleep(1000); // 10ms
- }
- else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN))
- error("POLLFD: LISTENER: accept() failed.");
-
- break;
- }
- else {
- // accept ok
- // info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s' on fd %d", client_ip, client_port, sockets->fds_names[i], nfd);
- poll_add_fd(p
- , nfd
- , SOCK_STREAM
- , pi->port_acl
- , POLLINFO_FLAG_CLIENT_SOCKET
- , client_ip
- , client_port
- , client_host
- , p->add_callback
- , p->del_callback
- , p->rcv_callback
- , p->snd_callback
- , NULL
- );
+ if (unlikely(pi->snd_callback(pi, &pf->events) == -1))
+ poll_close_fd(&p->inf[slot]);
- // it may have reallocated them, so refresh our pointers
- pf = &p->fds[i];
- pi = &p->inf[i];
- }
- } while (nfd >= 0 && (!p->limit || p->used < p->limit));
- break;
- }
+ // IMPORTANT:
+ // pf and pi may be invalid below this point, they may have been reallocated.
- case SOCK_DGRAM: {
- // a UDP socket
- // we read data from the server socket
+ return 1;
+}
- debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd);
+static inline int poll_process_tcp_read(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
+ pi->last_received_t = now;
+ pi->recv_count++;
- // TODO: access_list is not applied to UDP
- // but checking the access list on every UDP packet will destroy
- // performance, especially for statsd.
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", pi->slot, pf->fd);
- pf->events = 0;
- pi->rcv_callback(pi, &pf->events);
- break;
- }
+ pf->events = 0;
- default: {
- error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot);
- break;
- }
- }
- }
- }
+ // remember the slot, in case we need to close it later
+ // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call
+ size_t slot = pi->slot;
- if(unlikely(revents & POLLOUT)) {
- // sending data
- debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd);
+ if (pi->rcv_callback(pi, &pf->events) == -1)
+ poll_close_fd(&p->inf[slot]);
- pi->last_sent_t = now;
- pi->send_count++;
+ // IMPORTANT:
+ // pf and pi may be invalid below this point, they may have been reallocated.
- pf->events = 0;
- if (pi->snd_callback(pi, &pf->events) == -1) {
- poll_close_fd(&p->inf[i]);
- return;
- }
- pf = &p->fds[i];
- pi = &p->inf[i];
-
-#ifdef NETDATA_INTERNAL_CHECKS
- // this is common - it is used for streaming
- if(unlikely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET && !(pf->events & (POLLIN|POLLOUT)))) {
- error("POLLFD: LISTENER: after sending, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
- //poll_close_fd(pi);
- //return;
+ return 1;
+}
+
+static inline int poll_process_udp_read(POLLINFO *pi, struct pollfd *pf, time_t now __maybe_unused) {
+ pi->last_received_t = now;
+ pi->recv_count++;
+
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", pi->slot, pf->fd);
+
+ // TODO: access_list is not applied to UDP
+ // but checking the access list on every UDP packet will destroy
+ // performance, especially for statsd.
+
+ pf->events = 0;
+ if(pi->rcv_callback(pi, &pf->events) == -1)
+ return 0;
+
+ // IMPORTANT:
+ // pf and pi may be invalid below this point, they may have been reallocated.
+
+ return 1;
+}
+
+static int poll_process_new_tcp_connection(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
+ pi->last_received_t = now;
+ pi->recv_count++;
+
+ debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", pi->slot, pf->fd);
+
+ char client_ip[INET6_ADDRSTRLEN] = "";
+ char client_port[NI_MAXSERV] = "";
+ char client_host[NI_MAXHOST] = "";
+
+ debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", pi->slot, pf->fd);
+
+ int nfd = accept_socket(
+ pf->fd,SOCK_NONBLOCK,
+ client_ip, INET6_ADDRSTRLEN, client_port,NI_MAXSERV, client_host, NI_MAXHOST,
+ p->access_list, p->allow_dns
+ );
+
+ if (unlikely(nfd < 0)) {
+ // accept failed
+
+ debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", pi->slot, pf->fd);
+
+ if(unlikely(errno == EMFILE)) {
+ error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit);
+ usleep(1000); // 1ms
}
-#endif
- }
+ else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN))
+ error("POLLFD: LISTENER: accept() failed.");
- if(unlikely(revents & POLLERR)) {
- error("POLLFD: LISTENER: processing POLLERR events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
- pf->events = 0;
- poll_close_fd(pi);
- return;
}
+ else {
+ // accept ok
+
+ poll_add_fd(p
+ , nfd
+ , SOCK_STREAM
+ , pi->port_acl
+ , POLLINFO_FLAG_CLIENT_SOCKET
+ , client_ip
+ , client_port
+ , client_host
+ , p->add_callback
+ , p->del_callback
+ , p->rcv_callback
+ , p->snd_callback
+ , NULL
+ );
- if(unlikely(revents & POLLHUP)) {
- error("POLLFD: LISTENER: processing POLLHUP events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
- pf->events = 0;
- poll_close_fd(pi);
- return;
- }
+ // IMPORTANT:
+ // pf and pi may be invalid below this point, they may have been reallocated.
- if(unlikely(revents & POLLNVAL)) {
- error("POLLFD: LISTENER: processing POLLNVAL events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
- pf->events = 0;
- poll_close_fd(pi);
- return;
+ return 1;
}
+
+ return 0;
}
void poll_events(LISTEN_SOCKETS *sockets
@@ -1687,18 +1654,129 @@ void poll_events(LISTEN_SOCKETS *sockets
debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout.");
}
else {
+ POLLINFO *pi;
+ struct pollfd *pf;
+ size_t idx, processed = 0;
+ short int revents;
+
+ // keep fast lookup arrays per function
+ // to avoid looping through the entire list every time
+ size_t sends[p.max + 1], sends_max = 0;
+ size_t reads[p.max + 1], reads_max = 0;
+ size_t conns[p.max + 1], conns_max = 0;
+ size_t udprd[p.max + 1], udprd_max = 0;
+
for (i = 0; i <= p.max; i++) {
- struct pollfd *pf = &p.fds[i];
- short int revents = pf->revents;
- if (unlikely(revents))
- poll_events_process(&p, &p.inf[i], pf, revents, now);
+ pi = &p.inf[i];
+ pf = &p.fds[i];
+ revents = pf->revents;
+
+ if(unlikely(revents == 0 || pf->fd == -1))
+ continue;
+
+ if (unlikely(revents & (POLLERR|POLLHUP|POLLNVAL))) {
+ // something is wrong to one of our sockets
+
+ pf->revents = 0;
+ processed += poll_process_error(pi, pf, revents);
+ }
+ else if (likely(revents & POLLOUT)) {
+ // a client is ready to receive data
+
+ sends[sends_max++] = i;
+ }
+ else if (likely(revents & (POLLIN|POLLPRI))) {
+ if (pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
+ // a client sent data to us
+
+ reads[reads_max++] = i;
+ }
+ else if (pi->flags & POLLINFO_FLAG_SERVER_SOCKET) {
+ // something is coming to our server sockets
+
+ if(pi->socktype == SOCK_DGRAM) {
+ // UDP receive, directly on our listening socket
+
+ udprd[udprd_max++] = i;
+ }
+ else if(pi->socktype == SOCK_STREAM) {
+ // new TCP connection
+
+ conns[conns_max++] = i;
+ }
+ else
+ error("POLLFD: LISTENER: server slot %zu (fd %d) connection from %s port %s using unhandled socket type %d."
+ , i
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , pi->socktype
+ );
+ }
+ else
+ error("POLLFD: LISTENER: client slot %zu (fd %d) data from %s port %s using flags %08X is neither client nor server."
+ , i
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , pi->flags
+ );
+ }
+ else
+ error("POLLFD: LISTENER: socket slot %zu (fd %d) client %s port %s unhandled event id %d."
+ , i
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , revents
+ );
+ }
+
+ // process sends
+ for (idx = 0; idx < sends_max; idx++) {
+ i = sends[idx];
+ pi = &p.inf[i];
+ pf = &p.fds[i];
+ pf->revents = 0;
+ processed += poll_process_send(&p, pi, pf, now);
+ }
+
+ // process UDP reads
+ for (idx = 0; idx < udprd_max; idx++) {
+ i = udprd[idx];
+ pi = &p.inf[i];
+ pf = &p.fds[i];
+ pf->revents = 0;
+ processed += poll_process_udp_read(pi, pf, now);
+ }
+
+ // process TCP reads
+ for (idx = 0; idx < reads_max; idx++) {
+ i = reads[idx];
+ pi = &p.inf[i];
+ pf = &p.fds[i];
+ pf->revents = 0;
+ processed += poll_process_tcp_read(&p, pi, pf, now);
+ }
+
+ if(!processed && (!p.limit || p.used < p.limit)) {
+ // nothing processed above (rcv, snd) and we have room for another TCP connection
+ // so, accept one TCP connection
+ for (idx = 0; idx < conns_max; idx++) {
+ i = conns[idx];
+ pi = &p.inf[i];
+ pf = &p.fds[i];
+ pf->revents = 0;
+ if (poll_process_new_tcp_connection(&p, pi, pf, now))
+ break;
+ }
}
}
if(unlikely(p.checks_every > 0 && now - last_check > p.checks_every)) {
last_check = now;
- // security checks
+ // cleanup old sockets
for(i = 0; i <= p.max; i++) {
POLLINFO *pi = &p.inf[i];