summaryrefslogtreecommitdiffstats
path: root/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket.c')
-rw-r--r--src/socket.c650
1 files changed, 435 insertions, 215 deletions
diff --git a/src/socket.c b/src/socket.c
index 906ab33dd..8bede73fd 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -79,6 +79,29 @@ int sock_enlarge_out(int fd) {
// --------------------------------------------------------------------------------------------------------------------
+
+char *strdup_client_description(int family, const char *protocol, const char *ip, int port) {
+ char buffer[100 + 1];
+
+ switch(family) {
+ case AF_INET:
+ snprintfz(buffer, 100, "%s:%s:%d", protocol, ip, port);
+ break;
+
+ case AF_INET6:
+ default:
+ snprintfz(buffer, 100, "%s:[%s]:%d", protocol, ip, port);
+ break;
+
+ case AF_UNIX:
+ snprintfz(buffer, 100, "%s:%s", protocol, ip);
+ break;
+ }
+
+ return strdupz(buffer);
+}
+
+// --------------------------------------------------------------------------------------------------------------------
// listening sockets
int create_listen_socket_unix(const char *path, int listen_backlog) {
@@ -231,25 +254,7 @@ static inline int listen_sockets_add(LISTEN_SOCKETS *sockets, int fd, int family
sockets->fds[sockets->opened] = fd;
sockets->fds_types[sockets->opened] = socktype;
sockets->fds_families[sockets->opened] = family;
-
- char buffer[100 + 1];
-
- switch(family) {
- case AF_INET:
- snprintfz(buffer, 100, "%s:%s:%d", protocol, ip, port);
- break;
-
- case AF_INET6:
- default:
- snprintfz(buffer, 100, "%s:[%s]:%d", protocol, ip, port);
- break;
-
- case AF_UNIX:
- snprintfz(buffer, 100, "%s:%s", protocol, ip);
- break;
- }
-
- sockets->fds_names[sockets->opened] = strdupz(buffer);
+ sockets->fds_names[sockets->opened] = strdup_client_description(family, protocol, ip, port);
sockets->opened++;
return 0;
@@ -615,13 +620,39 @@ static inline int connect_to_this_ip46(int protocol, int socktype, const char *h
error("Failed to set timeout on the socket to ip '%s' port '%s'", hostBfr, servBfr);
}
+ errno = 0;
if(connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
- error("Failed to connect to '%s', port '%s'", hostBfr, servBfr);
- close(fd);
- fd = -1;
+ if(errno == EALREADY || errno == EINPROGRESS) {
+ info("Waiting for connection to ip %s port %s to be established", hostBfr, servBfr);
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(0, &fds);
+ int rc = select (1, NULL, &fds, NULL, timeout);
+
+ if(rc > 0 && FD_ISSET(fd, &fds)) {
+ info("connect() to ip %s port %s completed successfully", hostBfr, servBfr);
+ }
+ else if(rc == -1) {
+ error("Failed to connect to '%s', port '%s'. select() returned %d", hostBfr, servBfr, rc);
+ close(fd);
+ fd = -1;
+ }
+ else {
+ error("Timed out while connecting to '%s', port '%s'. select() returned %d", hostBfr, servBfr, rc);
+ close(fd);
+ fd = -1;
+ }
+ }
+ else {
+ error("Failed to connect to '%s', port '%s'", hostBfr, servBfr);
+ close(fd);
+ fd = -1;
+ }
}
- debug(D_CONNECT_TO, "Connected to '%s' on port '%s'.", hostBfr, servBfr);
+ if(fd != -1)
+ debug(D_CONNECT_TO, "Connected to '%s' on port '%s'.", hostBfr, servBfr);
}
}
@@ -838,7 +869,8 @@ int accept4(int sock, struct sockaddr *addr, socklen_t *addrlen, int flags) {
#endif
if (flags) {
- errno = -EINVAL;
+ close(fd);
+ errno = EINVAL;
return -1;
}
@@ -930,46 +962,36 @@ int accept_socket(int fd, int flags, char *client_ip, size_t ipsize, char *clien
#define POLL_FDS_INCREASE_STEP 10
-#define POLLINFO_FLAG_SERVER_SOCKET 0x00000001
-#define POLLINFO_FLAG_CLIENT_SOCKET 0x00000002
-
-struct pollinfo {
- size_t slot;
- char *client;
- struct pollinfo *next;
- uint32_t flags;
- int socktype;
-
- void *data;
-};
-
-struct poll {
- size_t slots;
- size_t used;
- size_t min;
- size_t max;
- struct pollfd *fds;
- struct pollinfo *inf;
- struct pollinfo *first_free;
-
- void *(*add_callback)(int fd, int socktype, short int *events);
- void (*del_callback)(int fd, int socktype, void *data);
- int (*rcv_callback)(int fd, int socktype, void *data, short int *events);
- int (*snd_callback)(int fd, int socktype, void *data, short int *events);
-};
-
-static inline struct pollinfo *poll_add_fd(struct poll *p, int fd, int socktype, short int events, uint32_t flags) {
+inline POLLINFO *poll_add_fd(POLLJOB *p
+ , int fd
+ , int socktype
+ , uint32_t flags
+ , const char *client_ip
+ , const char *client_port
+ , void *(*add_callback)(POLLINFO *pi, short int *events, void *data)
+ , void (*del_callback)(POLLINFO *pi)
+ , int (*rcv_callback)(POLLINFO *pi, short int *events)
+ , int (*snd_callback)(POLLINFO *pi, short int *events)
+ , void *data
+) {
debug(D_POLLFD, "POLLFD: ADD: request to add fd %d, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", fd, p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
if(unlikely(fd < 0)) return NULL;
+ //if(p->limit && p->used >= p->limit) {
+ // info("Max sockets limit reached (%zu sockets), dropping connection", p->used);
+ // close(fd);
+ // return NULL;
+ //}
+
if(unlikely(!p->first_free)) {
size_t new_slots = p->slots + POLL_FDS_INCREASE_STEP;
debug(D_POLLFD, "POLLFD: ADD: increasing size (current = %zu, new = %zu, used = %zu, min = %zu, max = %zu)", p->slots, new_slots, p->used, p->min, p->max);
p->fds = reallocz(p->fds, sizeof(struct pollfd) * new_slots);
- p->inf = reallocz(p->inf, sizeof(struct pollinfo) * new_slots);
+ p->inf = reallocz(p->inf, sizeof(POLLINFO) * new_slots);
+ // reset all the newly added slots
ssize_t i;
for(i = new_slots - 1; i >= (ssize_t)p->slots ; i--) {
debug(D_POLLFD, "POLLFD: ADD: resetting new slot %zd", i);
@@ -977,11 +999,19 @@ static inline struct pollinfo *poll_add_fd(struct poll *p, int fd, int socktype,
p->fds[i].events = 0;
p->fds[i].revents = 0;
+ p->inf[i].p = p;
p->inf[i].slot = (size_t)i;
p->inf[i].flags = 0;
p->inf[i].socktype = -1;
- p->inf[i].client = NULL;
+ p->inf[i].client_ip = NULL;
+ p->inf[i].client_port = NULL;
+ p->inf[i].del_callback = p->del_callback;
+ p->inf[i].rcv_callback = p->rcv_callback;
+ p->inf[i].snd_callback = p->snd_callback;
p->inf[i].data = NULL;
+
+ // link them so that the first free will be earlier in the array
+ // (we loop decrementing i)
p->inf[i].next = p->first_free;
p->first_free = &p->inf[i];
}
@@ -989,64 +1019,97 @@ static inline struct pollinfo *poll_add_fd(struct poll *p, int fd, int socktype,
p->slots = new_slots;
}
- struct pollinfo *pi = p->first_free;
+ POLLINFO *pi = p->first_free;
p->first_free = p->first_free->next;
debug(D_POLLFD, "POLLFD: ADD: selected slot %zu, next free is %zd", pi->slot, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
struct pollfd *pf = &p->fds[pi->slot];
pf->fd = fd;
- pf->events = events;
+ pf->events = POLLIN;
pf->revents = 0;
+ pi->fd = fd;
+ pi->p = p;
pi->socktype = socktype;
pi->flags = flags;
pi->next = NULL;
+ pi->client_ip = strdupz(client_ip);
+ pi->client_port = strdupz(client_port);
+
+ pi->del_callback = del_callback;
+ pi->rcv_callback = rcv_callback;
+ pi->snd_callback = snd_callback;
+
+ pi->connected_t = now_boottime_sec();
+ pi->last_received_t = 0;
+ pi->last_sent_t = 0;
+ pi->last_sent_t = 0;
+ pi->recv_count = 0;
+ pi->send_count = 0;
+ netdata_thread_disable_cancelability();
p->used++;
if(unlikely(pi->slot > p->max))
p->max = pi->slot;
if(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
- pi->data = p->add_callback(fd, pi->socktype, &pf->events);
+ pi->data = add_callback(pi, &pf->events, data);
}
if(pi->flags & POLLINFO_FLAG_SERVER_SOCKET) {
p->min = pi->slot;
}
+ netdata_thread_enable_cancelability();
debug(D_POLLFD, "POLLFD: ADD: completed, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
return pi;
}
-static inline void poll_close_fd(struct poll *p, struct pollinfo *pi) {
+inline void poll_close_fd(POLLINFO *pi) {
+ POLLJOB *p = pi->p;
+
struct pollfd *pf = &p->fds[pi->slot];
debug(D_POLLFD, "POLLFD: DEL: request to clear slot %zu (fd %d), old next free was %zd", pi->slot, pf->fd, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
if(unlikely(pf->fd == -1)) return;
+ netdata_thread_disable_cancelability();
+
if(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
- p->del_callback(pf->fd, pi->socktype, pi->data);
+ pi->del_callback(pi);
+
+ if(likely(!(pi->flags & POLLINFO_FLAG_DONT_CLOSE))) {
+ if(close(pf->fd) == -1)
+ error("Failed to close() poll_events() socket %d", pf->fd);
+ }
}
- close(pf->fd);
pf->fd = -1;
pf->events = 0;
pf->revents = 0;
+ pi->fd = -1;
pi->socktype = -1;
pi->flags = 0;
pi->data = NULL;
- freez(pi->client);
- pi->client = NULL;
+ pi->del_callback = NULL;
+ pi->rcv_callback = NULL;
+ pi->snd_callback = NULL;
+
+ freez(pi->client_ip);
+ pi->client_ip = NULL;
+
+ freez(pi->client_port);
+ pi->client_port = NULL;
pi->next = p->first_free;
p->first_free = pi;
p->used--;
- if(p->max == pi->slot) {
+ if(unlikely(p->max == pi->slot)) {
p->max = p->min;
ssize_t i;
for(i = (ssize_t)pi->slot; i > (ssize_t)p->min ;i--) {
@@ -1056,243 +1119,400 @@ static inline void poll_close_fd(struct poll *p, struct pollinfo *pi) {
}
}
}
+ netdata_thread_enable_cancelability();
debug(D_POLLFD, "POLLFD: DEL: completed, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
}
-static void *add_callback_default(int fd, int socktype, short int *events) {
- (void)fd;
- (void)socktype;
+void *poll_default_add_callback(POLLINFO *pi, short int *events, void *data) {
+ (void)pi;
(void)events;
+ (void)data;
+
+ // error("POLLFD: internal error: poll_default_add_callback() called");
return NULL;
}
-static void del_callback_default(int fd, int socktype, void *data) {
- (void)fd;
- (void)socktype;
- (void)data;
- if(data)
+void poll_default_del_callback(POLLINFO *pi) {
+ if(pi->data)
error("POLLFD: internal error: del_callback_default() called with data pointer - possible memory leak");
}
-static int rcv_callback_default(int fd, int socktype, void *data, short int *events) {
- (void)socktype;
- (void)data;
- (void)events;
+int poll_default_rcv_callback(POLLINFO *pi, short int *events) {
+ *events |= POLLIN;
char buffer[1024 + 1];
ssize_t rc;
do {
- rc = recv(fd, buffer, 1024, MSG_DONTWAIT);
+ rc = recv(pi->fd, buffer, 1024, MSG_DONTWAIT);
if (rc < 0) {
// read failed
if (errno != EWOULDBLOCK && errno != EAGAIN) {
- error("POLLFD: recv() failed.");
+ error("POLLFD: poll_default_rcv_callback(): recv() failed with %zd.", rc);
return -1;
}
} else if (rc) {
// data received
- info("POLLFD: internal error: discarding %zd bytes received on socket %d", rc, fd);
+ info("POLLFD: internal error: poll_default_rcv_callback() is discarding %zd bytes received on socket %d", rc, pi->fd);
}
} while (rc != -1);
return 0;
}
-static int snd_callback_default(int fd, int socktype, void *data, short int *events) {
- (void)socktype;
- (void)data;
- (void)events;
-
+int poll_default_snd_callback(POLLINFO *pi, short int *events) {
*events &= ~POLLOUT;
- info("POLLFD: internal error: nothing to send on socket %d", fd);
+ info("POLLFD: internal error: poll_default_snd_callback(): nothing to send on socket %d", pi->fd);
return 0;
}
-void poll_events_cleanup(void *data) {
- struct poll *p = (struct poll *)data;
+void poll_default_tmr_callback(void *timer_data) {
+ (void)timer_data;
+}
+
+static void poll_events_cleanup(void *data) {
+ POLLJOB *p = (POLLJOB *)data;
size_t i;
for(i = 0 ; i <= p->max ; i++) {
- struct pollinfo *pi = &p->inf[i];
- poll_close_fd(p, pi);
+ POLLINFO *pi = &p->inf[i];
+ poll_close_fd(pi);
}
freez(p->fds);
freez(p->inf);
}
-void poll_events(LISTEN_SOCKETS *sockets
- , void *(*add_callback)(int fd, int socktype, short int *events)
- , void (*del_callback)(int fd, int socktype, void *data)
- , int (*rcv_callback)(int fd, int socktype, void *data, short int *events)
- , int (*snd_callback)(int fd, int socktype, void *data, short int *events)
- , SIMPLE_PATTERN *access_list
- , void *data
-) {
- int retval;
+static void poll_events_process(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, short int revents, time_t now) {
+ short int events = pf->events;
+ int fd = pf->fd;
+ pf->revents = 0;
+ size_t i = pi->slot;
- struct poll p = {
- .slots = 0,
- .used = 0,
- .max = 0,
- .fds = NULL,
- .inf = NULL,
- .first_free = NULL,
+ if(unlikely(fd == -1)) {
+ debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i);
+ return;
+ }
- .add_callback = add_callback?add_callback:add_callback_default,
- .del_callback = del_callback?del_callback:del_callback_default,
- .rcv_callback = rcv_callback?rcv_callback:rcv_callback_default,
- .snd_callback = snd_callback?snd_callback:snd_callback_default
- };
+ debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, events, revents);
- size_t i;
- for(i = 0; i < sockets->opened ;i++) {
- struct pollinfo *pi = poll_add_fd(&p, sockets->fds[i], sockets->fds_types[i], POLLIN, POLLINFO_FLAG_SERVER_SOCKET);
- pi->data = data;
- info("POLLFD: LISTENER: listening on '%s'", (sockets->fds_names[i])?sockets->fds_names[i]:"UNKNOWN");
- }
+ if(revents & POLLIN || revents & POLLPRI) {
+ // receiving data
- int timeout = -1; // wait forever
+ pi->last_received_t = now;
+ pi->recv_count++;
- pthread_cleanup_push(poll_events_cleanup, &p);
+ if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
+ // read data from client TCP socket
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd);
- for(;;) {
- if(unlikely(netdata_exit)) break;
+ pf->events = 0;
+ if (pi->rcv_callback(pi, &pf->events) == -1) {
+ poll_close_fd(&p->inf[i]);
+ return;
+ }
+ pf = &p->fds[i];
+ pi = &p->inf[i];
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // this is common - it is used for web server file copies
+ if(unlikely(!(pf->events & (POLLIN|POLLOUT)))) {
+ error("POLLFD: LISTENER: after reading, client slot %zu (fd %d) from '%s:%s' was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
+ //poll_close_fd(pi);
+ //return;
+ }
+#endif
+ }
+ else if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) {
+ // new connection
+ // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd);
+
+ switch(pi->socktype) {
+ case SOCK_STREAM: {
+ // a TCP socket
+ // we accept the connection
+
+ int nfd;
+ do {
+ char client_ip[NI_MAXHOST + 1];
+ char client_port[NI_MAXSERV + 1];
+
+ debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd);
+ nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, NI_MAXHOST + 1, client_port, NI_MAXSERV + 1, p->access_list);
+ if (unlikely(nfd < 0)) {
+ // accept failed
+
+ debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd);
+
+ if(unlikely(errno == EMFILE)) {
+ error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit);
+ usleep(1000); // 10ms
+ }
+ else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN))
+ error("POLLFD: LISTENER: accept() failed.");
- debug(D_POLLFD, "POLLFD: LISTENER: Waiting on %zu sockets...", p.max + 1);
- retval = poll(p.fds, p.max + 1, timeout);
+ break;
+ }
+ else {
+ // accept ok
+ // info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s' on fd %d", client_ip, client_port, sockets->fds_names[i], nfd);
+ poll_add_fd(p
+ , nfd
+ , SOCK_STREAM
+ , POLLINFO_FLAG_CLIENT_SOCKET
+ , client_ip
+ , client_port
+ , p->add_callback
+ , p->del_callback
+ , p->rcv_callback
+ , p->snd_callback
+ , NULL
+ );
+
+ // it may have reallocated them, so refresh our pointers
+ pf = &p->fds[i];
+ pi = &p->inf[i];
+ }
+ } while (nfd >= 0 && (!p->limit || p->used < p->limit));
+ break;
+ }
- if(unlikely(retval == -1)) {
- error("POLLFD: LISTENER: poll() failed.");
- continue;
+ case SOCK_DGRAM: {
+ // a UDP socket
+ // we read data from the server socket
+
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd);
+
+ // FIXME: access_list is not applied to UDP
+
+ pf->events = 0;
+ pi->rcv_callback(pi, &pf->events);
+ break;
+ }
+
+ default: {
+ error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot);
+ break;
+ }
+ }
}
- else if(unlikely(!retval)) {
- debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout.");
- continue;
+ }
+
+ if(unlikely(revents & POLLOUT)) {
+ // sending data
+ debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd);
+
+ pi->last_sent_t = now;
+ pi->send_count++;
+
+ pf->events = 0;
+ if (pi->snd_callback(pi, &pf->events) == -1) {
+ poll_close_fd(&p->inf[i]);
+ return;
+ }
+ pf = &p->fds[i];
+ pi = &p->inf[i];
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // this is common - it is used for streaming
+ if(unlikely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET && !(pf->events & (POLLIN|POLLOUT)))) {
+ error("POLLFD: LISTENER: after sending, client slot %zu (fd %d) from '%s:%s' was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
+ //poll_close_fd(pi);
+ //return;
}
+#endif
+ }
- if(unlikely(netdata_exit)) break;
+ if(unlikely(revents & POLLERR)) {
+ error("POLLFD: LISTENER: processing POLLERR events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
+ pf->events = 0;
+ poll_close_fd(pi);
+ return;
+ }
- for(i = 0 ; i <= p.max ; i++) {
- struct pollfd *pf = &p.fds[i];
- struct pollinfo *pi = &p.inf[i];
- int fd = pf->fd;
- short int revents = pf->revents;
- pf->revents = 0;
+ if(unlikely(revents & POLLHUP)) {
+ error("POLLFD: LISTENER: processing POLLHUP events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
+ pf->events = 0;
+ poll_close_fd(pi);
+ return;
+ }
- if(unlikely(fd == -1)) {
- debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i);
- continue;
- }
+ if(unlikely(revents & POLLNVAL)) {
+ error("POLLFD: LISTENER: processing POLLNVAL events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
+ pf->events = 0;
+ poll_close_fd(pi);
+ return;
+ }
+}
- debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
+void poll_events(LISTEN_SOCKETS *sockets
+ , void *(*add_callback)(POLLINFO *pi, short int *events, void *data)
+ , void (*del_callback)(POLLINFO *pi)
+ , int (*rcv_callback)(POLLINFO *pi, short int *events)
+ , int (*snd_callback)(POLLINFO *pi, short int *events)
+ , void (*tmr_callback)(void *timer_data)
+ , SIMPLE_PATTERN *access_list
+ , void *data
+ , time_t tcp_request_timeout_seconds
+ , time_t tcp_idle_timeout_seconds
+ , time_t timer_milliseconds
+ , void *timer_data
+ , size_t max_tcp_sockets
+) {
+ if(!sockets || !sockets->opened) {
+ error("POLLFD: internal error: no listening sockets are opened");
+ return;
+ }
- if(revents & POLLIN || revents & POLLPRI) {
- // receiving data
+ if(timer_milliseconds <= 0) timer_milliseconds = 0;
- if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) {
- // new connection
- // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd);
+ int retval;
- switch(pi->socktype) {
- case SOCK_STREAM: {
- // a TCP socket
- // we accept the connection
+ POLLJOB p = {
+ .slots = 0,
+ .used = 0,
+ .max = 0,
+ .limit = max_tcp_sockets,
+ .fds = NULL,
+ .inf = NULL,
+ .first_free = NULL,
- int nfd;
- do {
- char client_ip[NI_MAXHOST + 1];
- char client_port[NI_MAXSERV + 1];
+ .complete_request_timeout = tcp_request_timeout_seconds,
+ .idle_timeout = tcp_idle_timeout_seconds,
+ .checks_every = (tcp_idle_timeout_seconds / 3) + 1,
- debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd);
- nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, NI_MAXHOST + 1, client_port, NI_MAXSERV + 1, access_list);
- if (nfd < 0) {
- // accept failed
+ .access_list = access_list,
- debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd);
+ .timer_milliseconds = timer_milliseconds,
+ .timer_data = timer_data,
- if(errno != EWOULDBLOCK && errno != EAGAIN)
- error("POLLFD: LISTENER: accept() failed.");
+ .add_callback = add_callback?add_callback:poll_default_add_callback,
+ .del_callback = del_callback?del_callback:poll_default_del_callback,
+ .rcv_callback = rcv_callback?rcv_callback:poll_default_rcv_callback,
+ .snd_callback = snd_callback?snd_callback:poll_default_snd_callback,
+ .tmr_callback = tmr_callback?tmr_callback:poll_default_tmr_callback
+ };
- break;
- }
- else {
- // accept ok
- info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s'", client_ip, client_port, sockets->fds_names[i]);
- poll_add_fd(&p, nfd, SOCK_STREAM, POLLIN, POLLINFO_FLAG_CLIENT_SOCKET);
+ size_t i;
+ for(i = 0; i < sockets->opened ;i++) {
- // it may have realloced them, so refresh our pointers
- pf = &p.fds[i];
- pi = &p.inf[i];
- }
- } while (nfd != -1);
- break;
- }
+ POLLINFO *pi = poll_add_fd(&p
+ , sockets->fds[i]
+ , sockets->fds_types[i]
+ , POLLINFO_FLAG_SERVER_SOCKET
+ , (sockets->fds_names[i])?sockets->fds_names[i]:"UNKNOWN"
+ , ""
+ , p.add_callback
+ , p.del_callback
+ , p.rcv_callback
+ , p.snd_callback
+ , NULL
+ );
- case SOCK_DGRAM: {
- // a UDP socket
- // we read data from the server socket
+ pi->data = data;
+ info("POLLFD: LISTENER: listening on '%s'", (sockets->fds_names[i])?sockets->fds_names[i]:"UNKNOWN");
+ }
- debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd);
+ int listen_sockets_active = 1;
- // FIXME: access_list is not applied to UDP
+ int timeout_ms = 1000; // in milliseconds
+ time_t last_check = now_boottime_sec();
- p.rcv_callback(fd, pi->socktype, pi->data, &pf->events);
- break;
- }
+ usec_t timer_usec = timer_milliseconds * USEC_PER_MS;
+ usec_t now_usec = 0, next_timer_usec = 0, last_timer_usec = 0;
+ if(unlikely(timer_usec)) {
+ now_usec = now_boottime_usec();
+ next_timer_usec = now_usec - (now_usec % timer_usec) + timer_usec;
+ }
- default: {
- error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot);
- break;
- }
- }
- }
+ netdata_thread_cleanup_push(poll_events_cleanup, &p);
- if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
- // read data from client TCP socket
- debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd);
+ while(!netdata_exit) {
+ if(unlikely(timer_usec)) {
+ now_usec = now_boottime_usec();
- if (p.rcv_callback(fd, pi->socktype, pi->data, &pf->events) == -1) {
- poll_close_fd(&p, pi);
- continue;
- }
- }
+ if(unlikely(timer_usec && now_usec >= next_timer_usec)) {
+ debug(D_POLLFD, "Calling timer callback after %zu usec", (size_t)(now_usec - last_timer_usec));
+ last_timer_usec = now_usec;
+ p.tmr_callback(p.timer_data);
+ now_usec = now_boottime_usec();
+ next_timer_usec = now_usec - (now_usec % timer_usec) + timer_usec;
}
- if(unlikely(revents & POLLOUT)) {
- // sending data
- debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd);
+ usec_t dt_usec = next_timer_usec - now_usec;
+ if(dt_usec > 1000 * USEC_PER_MS)
+ timeout_ms = 1000;
+ else
+ timeout_ms = (int)(dt_usec / USEC_PER_MS);
+ }
- if (p.snd_callback(fd, pi->socktype, pi->data, &pf->events) == -1) {
- poll_close_fd(&p, pi);
- continue;
+ // enable or disable the TCP listening sockets, based on the current number of sockets used and the limit set
+ if((listen_sockets_active && (p.limit && p.used >= p.limit)) || (!listen_sockets_active && (!p.limit || p.used < p.limit))) {
+ listen_sockets_active = !listen_sockets_active;
+ info("%s listening sockets (used TCP sockets %zu, max allowed for this worker %zu)", (listen_sockets_active)?"ENABLING":"DISABLING", p.used, p.limit);
+ for (i = 0; i <= p.max; i++) {
+ if(p.inf[i].flags & POLLINFO_FLAG_SERVER_SOCKET && p.inf[i].socktype == SOCK_STREAM) {
+ p.fds[i].events = (short int) ((listen_sockets_active) ? POLLIN : 0);
}
}
+ }
- if(unlikely(revents & POLLERR)) {
- error("POLLFD: LISTENER: processing POLLERR events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
- poll_close_fd(&p, pi);
- continue;
- }
+ debug(D_POLLFD, "POLLFD: LISTENER: Waiting on %zu sockets for %zu ms...", p.max + 1, (size_t)timeout_ms);
+ retval = poll(p.fds, p.max + 1, timeout_ms);
+ time_t now = now_boottime_sec();
- if(unlikely(revents & POLLHUP)) {
- error("POLLFD: LISTENER: processing POLLHUP events for slot %zu (events = %d, revents = %d)", i, pf->events, pf->revents);
- poll_close_fd(&p, pi);
- continue;
+ if(unlikely(retval == -1)) {
+ error("POLLFD: LISTENER: poll() failed while waiting on %zu sockets.", p.max + 1);
+ break;
+ }
+ else if(unlikely(!retval)) {
+ debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout.");
+ }
+ else {
+ for (i = 0; i <= p.max; i++) {
+ struct pollfd *pf = &p.fds[i];
+ short int revents = pf->revents;
+ if (unlikely(revents))
+ poll_events_process(&p, &p.inf[i], pf, revents, now);
}
+ }
- if(unlikely(revents & POLLNVAL)) {
- error("POLLFD: LISTENER: processing POLLNVAP events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
- poll_close_fd(&p, pi);
- continue;
+ if(unlikely(p.checks_every > 0 && now - last_check > p.checks_every)) {
+ last_check = now;
+
+ // security checks
+ for(i = 0; i <= p.max; i++) {
+ POLLINFO *pi = &p.inf[i];
+
+ if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
+ if (unlikely(pi->send_count == 0 && p.complete_request_timeout > 0 && (now - pi->connected_t) >= p.complete_request_timeout)) {
+ info("POLLFD: LISTENER: client slot %zu (fd %d) from '%s:%s' has not sent a complete request in %zu seconds - closing it. "
+ , i
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , (size_t) p.complete_request_timeout
+ );
+ poll_close_fd(pi);
+ }
+ else if(unlikely(pi->recv_count && p.idle_timeout > 0 && now - ((pi->last_received_t > pi->last_sent_t) ? pi->last_received_t : pi->last_sent_t) >= p.idle_timeout )) {
+ info("POLLFD: LISTENER: client slot %zu (fd %d) from '%s:%s' is idle for more than %zu seconds - closing it. "
+ , i
+ , pi->fd
+ , pi->client_ip ? pi->client_ip : "<undefined-ip>"
+ , pi->client_port ? pi->client_port : "<undefined-port>"
+ , (size_t) p.idle_timeout
+ );
+ poll_close_fd(pi);
+ }
+ }
}
}
}
- pthread_cleanup_pop(1);
+ netdata_thread_cleanup_pop(1);
debug(D_POLLFD, "POLLFD: LISTENER: cleanup completed");
}