summaryrefslogtreecommitdiffstats
path: root/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/socket.c1006
1 files changed, 940 insertions, 66 deletions
diff --git a/src/socket.c b/src/socket.c
index 400c1ef4..2b382119 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -1,44 +1,267 @@
#include "common.h"
-// connect_to()
-//
-// definition format:
-//
-// [PROTOCOL:]IP[%INTERFACE][:PORT]
-//
-// PROTOCOL = tcp or udp
-// IP = IPv4 or IPv6 IP or hostname, optionally enclosed in [] (required for IPv6)
-// INTERFACE = for IPv6 only, the network interface to use
-// PORT = port number or service name
+// --------------------------------------------------------------------------------------------------------------------
+// various library calls
-int connect_to(const char *definition, int default_port, struct timeval *timeout) {
+#ifdef __gnu_linux__
+#define LARGE_SOCK_SIZE 33554431 // don't ask why - I found it at brubeck source - I guess it is just a large number
+#else
+#define LARGE_SOCK_SIZE 4096
+#endif
+
+int sock_setnonblock(int fd) {
+ int flags;
+
+ flags = fcntl(fd, F_GETFL);
+ flags |= O_NONBLOCK;
+
+ int ret = fcntl(fd, F_SETFL, flags);
+ if(ret < 0)
+ error("Failed to set O_NONBLOCK on socket %d", fd);
+
+ return ret;
+}
+
+int sock_delnonblock(int fd) {
+ int flags;
+
+ flags = fcntl(fd, F_GETFL);
+ flags &= ~O_NONBLOCK;
+
+ int ret = fcntl(fd, F_SETFL, flags);
+ if(ret < 0)
+ error("Failed to remove O_NONBLOCK on socket %d", fd);
+
+ return ret;
+}
+
+int sock_setreuse(int fd, int reuse) {
+ int ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
+
+ if(ret == -1)
+ error("Failed to set SO_REUSEADDR on socket %d", fd);
+
+ return ret;
+}
+
+int sock_setreuse_port(int fd, int reuse) {
+ int ret = -1;
+#ifdef SO_REUSEPORT
+ ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
+ if(ret == -1)
+ error("failed to set SO_REUSEPORT on socket %d", fd);
+#endif
+
+ return ret;
+}
+
+int sock_enlarge_in(int fd) {
+ int ret, bs = LARGE_SOCK_SIZE;
+
+ ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &bs, sizeof(bs));
+
+ if(ret == -1)
+ error("Failed to set SO_RCVBUF on socket %d", fd);
+
+ return ret;
+}
+
+int sock_enlarge_out(int fd) {
+ int ret, bs = LARGE_SOCK_SIZE;
+ ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &bs, sizeof(bs));
+
+ if(ret == -1)
+ error("Failed to set SO_SNDBUF on socket %d", fd);
+
+ return ret;
+}
+
+
+// --------------------------------------------------------------------------------------------------------------------
+// listening sockets
+
+int create_listen_socket4(int socktype, const char *ip, int port, int listen_backlog) {
+ int sock;
+
+ debug(D_LISTENER, "LISTENER: IPv4 creating new listening socket on ip '%s' port %d, socktype %d", ip, port, socktype);
+
+ sock = socket(AF_INET, socktype, 0);
+ if(sock < 0) {
+ error("LISTENER: IPv4 socket() on ip '%s' port %d, socktype %d failed.", ip, port, socktype);
+ return -1;
+ }
+
+ sock_setreuse(sock, 1);
+ sock_setreuse_port(sock, 1);
+ sock_setnonblock(sock);
+ sock_enlarge_in(sock);
+
+ struct sockaddr_in name;
+ memset(&name, 0, sizeof(struct sockaddr_in));
+ name.sin_family = AF_INET;
+ name.sin_port = htons (port);
+
+ int ret = inet_pton(AF_INET, ip, (void *)&name.sin_addr.s_addr);
+ if(ret != 1) {
+ error("LISTENER: Failed to convert IP '%s' to a valid IPv4 address.", ip);
+ close(sock);
+ return -1;
+ }
+
+ if(bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0) {
+ close(sock);
+ error("LISTENER: IPv4 bind() on ip '%s' port %d, socktype %d failed.", ip, port, socktype);
+ return -1;
+ }
+
+ if(socktype == SOCK_STREAM && listen(sock, listen_backlog) < 0) {
+ close(sock);
+ error("LISTENER: IPv4 listen() on ip '%s' port %d, socktype %d failed.", ip, port, socktype);
+ return -1;
+ }
+
+ debug(D_LISTENER, "LISTENER: Listening on IPv4 ip '%s' port %d, socktype %d", ip, port, socktype);
+ return sock;
+}
+
+int create_listen_socket6(int socktype, uint32_t scope_id, const char *ip, int port, int listen_backlog) {
+ int sock;
+ int ipv6only = 1;
+
+ debug(D_LISTENER, "LISTENER: IPv6 creating new listening socket on ip '%s' port %d, socktype %d", ip, port, socktype);
+
+ sock = socket(AF_INET6, socktype, 0);
+ if (sock < 0) {
+ error("LISTENER: IPv6 socket() on ip '%s' port %d, socktype %d, failed.", ip, port, socktype);
+ return -1;
+ }
+
+ sock_setreuse(sock, 1);
+ sock_setreuse_port(sock, 1);
+ sock_setnonblock(sock);
+ sock_enlarge_in(sock);
+
+ /* IPv6 only */
+ if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&ipv6only, sizeof(ipv6only)) != 0)
+ error("LISTENER: Cannot set IPV6_V6ONLY on ip '%s' port %d, socktype %d.", ip, port, socktype);
+
+ struct sockaddr_in6 name;
+ memset(&name, 0, sizeof(struct sockaddr_in6));
+ name.sin6_family = AF_INET6;
+ name.sin6_port = htons ((uint16_t) port);
+ name.sin6_scope_id = scope_id;
+
+ int ret = inet_pton(AF_INET6, ip, (void *)&name.sin6_addr.s6_addr);
+ if(ret != 1) {
+ error("LISTENER: Failed to convert IP '%s' to a valid IPv6 address.", ip);
+ close(sock);
+ return -1;
+ }
+
+ name.sin6_scope_id = scope_id;
+
+ if (bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0) {
+ close(sock);
+ error("LISTENER: IPv6 bind() on ip '%s' port %d, socktype %d failed.", ip, port, socktype);
+ return -1;
+ }
+
+ if (socktype == SOCK_STREAM && listen(sock, listen_backlog) < 0) {
+ close(sock);
+ error("LISTENER: IPv6 listen() on ip '%s' port %d, socktype %d failed.", ip, port, socktype);
+ return -1;
+ }
+
+ debug(D_LISTENER, "LISTENER: Listening on IPv6 ip '%s' port %d, socktype %d", ip, port, socktype);
+ return sock;
+}
+
+static inline int listen_sockets_add(LISTEN_SOCKETS *sockets, int fd, int socktype, const char *protocol, const char *ip, int port) {
+ if(sockets->opened >= MAX_LISTEN_FDS) {
+ error("LISTENER: Too many listening sockets. Failed to add listening %s socket at ip '%s' port %d, protocol %s, socktype %d", protocol, ip, port, protocol, socktype);
+ close(fd);
+ return -1;
+ }
+
+ sockets->fds[sockets->opened] = fd;
+
+ char buffer[100 + 1];
+ snprintfz(buffer, 100, "%s:[%s]:%d", protocol, ip, port);
+ sockets->fds_names[sockets->opened] = strdupz(buffer);
+ sockets->fds_types[sockets->opened] = socktype;
+
+ sockets->opened++;
+ return 0;
+}
+
+int listen_sockets_check_is_member(LISTEN_SOCKETS *sockets, int fd) {
+ size_t i;
+ for(i = 0; i < sockets->opened ;i++)
+ if(sockets->fds[i] == fd) return 1;
+
+ return 0;
+}
+
+static inline void listen_sockets_init(LISTEN_SOCKETS *sockets) {
+ size_t i;
+ for(i = 0; i < MAX_LISTEN_FDS ;i++) {
+ sockets->fds[i] = -1;
+ sockets->fds_names[i] = NULL;
+ sockets->fds_types[i] = -1;
+ }
+
+ sockets->opened = 0;
+ sockets->failed = 0;
+}
+
+void listen_sockets_close(LISTEN_SOCKETS *sockets) {
+ size_t i;
+ for(i = 0; i < sockets->opened ;i++) {
+ close(sockets->fds[i]);
+ sockets->fds[i] = -1;
+
+ freez(sockets->fds_names[i]);
+ sockets->fds_names[i] = NULL;
+
+ sockets->fds_types[i] = -1;
+ }
+
+ sockets->opened = 0;
+ sockets->failed = 0;
+}
+
+static inline int bind_to_one(LISTEN_SOCKETS *sockets, const char *definition, int default_port, int listen_backlog) {
+ int added = 0;
struct addrinfo hints;
- struct addrinfo *ai_head = NULL, *ai = NULL;
+ struct addrinfo *result = NULL, *rp = NULL;
char buffer[strlen(definition) + 1];
strcpy(buffer, definition);
- char default_service[10 + 1];
- snprintfz(default_service, 10, "%d", default_port);
+ char buffer2[10 + 1];
+ snprintfz(buffer2, 10, "%d", default_port);
+
+ char *ip = buffer, *port = buffer2, *interface = "";;
- char *host = buffer, *service = default_service, *interface = "";
int protocol = IPPROTO_TCP, socktype = SOCK_STREAM;
- uint32_t scope_id = 0;
+ const char *protocol_str = "tcp";
- if(strncmp(host, "tcp:", 4) == 0) {
- host += 4;
+ if(strncmp(ip, "tcp:", 4) == 0) {
+ ip += 4;
protocol = IPPROTO_TCP;
socktype = SOCK_STREAM;
+ protocol_str = "tcp";
}
- else if(strncmp(host, "udp:", 4) == 0) {
- host += 4;
+ else if(strncmp(ip, "udp:", 4) == 0) {
+ ip += 4;
protocol = IPPROTO_UDP;
socktype = SOCK_DGRAM;
+ protocol_str = "udp";
}
- char *e = host;
+ char *e = ip;
if(*e == '[') {
- e = ++host;
+ e = ++ip;
while(*e && *e != ']') e++;
if(*e == ']') {
*e = '\0';
@@ -57,26 +280,142 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout
}
if(*e == ':') {
+ port = e + 1;
*e = '\0';
- e++;
- service = e;
}
- debug(D_CONNECT_TO, "Attempting connection to host = '%s', service = '%s', interface = '%s', protocol = %d (tcp = %d, udp = %d)", host, service, interface, protocol, IPPROTO_TCP, IPPROTO_UDP);
+ uint32_t scope_id = 0;
+ if(*interface) {
+ scope_id = if_nametoindex(interface);
+ if(!scope_id)
+ error("LISTENER: Cannot find a network interface named '%s'. Continuing with limiting the network interface", interface);
+ }
- if(!*host) {
- error("Definition '%s' does not specify a host.", definition);
+ if(!*ip || *ip == '*' || !strcmp(ip, "any") || !strcmp(ip, "all"))
+ ip = NULL;
+
+ if(!*port)
+ port = buffer2;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_socktype = socktype;
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = protocol;
+ hints.ai_canonname = NULL;
+ hints.ai_addr = NULL;
+ hints.ai_next = NULL;
+
+ int r = getaddrinfo(ip, port, &hints, &result);
+ if (r != 0) {
+ error("LISTENER: getaddrinfo('%s', '%s'): %s\n", ip, port, gai_strerror(r));
return -1;
}
- if(*interface) {
- scope_id = if_nametoindex(interface);
- if(!scope_id)
- error("Cannot find a network interface named '%s'. Continuing with limiting the network interface", interface);
+ for (rp = result; rp != NULL; rp = rp->ai_next) {
+ int fd = -1;
+
+ char rip[INET_ADDRSTRLEN + INET6_ADDRSTRLEN] = "INVALID";
+ int rport = default_port;
+
+ switch (rp->ai_addr->sa_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in *) rp->ai_addr;
+ inet_ntop(AF_INET, &sin->sin_addr, rip, INET_ADDRSTRLEN);
+ rport = ntohs(sin->sin_port);
+ // info("Attempting to listen on IPv4 '%s' ('%s'), port %d ('%s'), socktype %d", rip, ip, rport, port, socktype);
+ fd = create_listen_socket4(socktype, rip, rport, listen_backlog);
+ break;
+ }
+
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) rp->ai_addr;
+ inet_ntop(AF_INET6, &sin6->sin6_addr, rip, INET6_ADDRSTRLEN);
+ rport = ntohs(sin6->sin6_port);
+ // info("Attempting to listen on IPv6 '%s' ('%s'), port %d ('%s'), socktype %d", rip, ip, rport, port, socktype);
+ fd = create_listen_socket6(socktype, scope_id, rip, rport, listen_backlog);
+ break;
+ }
+
+ default:
+ debug(D_LISTENER, "LISTENER: Unknown socket family %d", rp->ai_addr->sa_family);
+ break;
+ }
+
+ if (fd == -1) {
+ error("LISTENER: Cannot bind to ip '%s', port %d", rip, rport);
+ sockets->failed++;
+ }
+ else {
+ listen_sockets_add(sockets, fd, socktype, protocol_str, rip, rport);
+ added++;
+ }
}
- if(!*service)
- service = default_service;
+ freeaddrinfo(result);
+
+ return added;
+}
+
+int listen_sockets_setup(LISTEN_SOCKETS *sockets) {
+ listen_sockets_init(sockets);
+
+ sockets->backlog = (int) config_get_number(sockets->config_section, "listen backlog", sockets->backlog);
+
+ int old_port = sockets->default_port;
+ sockets->default_port = (int) config_get_number(sockets->config_section, "default port", sockets->default_port);
+ if(sockets->default_port < 1 || sockets->default_port > 65535) {
+ error("LISTENER: Invalid listen port %d given. Defaulting to %d.", sockets->default_port, old_port);
+ sockets->default_port = (int) config_set_number(sockets->config_section, "default port", old_port);
+ }
+ debug(D_OPTIONS, "LISTENER: Default listen port set to %d.", sockets->default_port);
+
+ char *s = config_get(sockets->config_section, "bind to", sockets->default_bind_to);
+ while(*s) {
+ char *e = s;
+
+ // skip separators, moving both s(tart) and e(nd)
+ while(isspace(*e) || *e == ',') s = ++e;
+
+ // move e(nd) to the first separator
+ while(*e && !isspace(*e) && *e != ',') e++;
+
+ // is there anything?
+ if(!*s || s == e) break;
+
+ char buf[e - s + 1];
+ strncpyz(buf, s, e - s);
+ bind_to_one(sockets, buf, sockets->default_port, sockets->backlog);
+
+ s = e;
+ }
+
+ if(sockets->failed) {
+ size_t i;
+ for(i = 0; i < sockets->opened ;i++)
+ info("LISTENER: Listen socket %s opened successfully.", sockets->fds_names[i]);
+ }
+
+ return (int)sockets->opened;
+}
+
+
+// --------------------------------------------------------------------------------------------------------------------
+// connect to another host/port
+
+// _connect_to()
+// protocol IPPROTO_TCP, IPPROTO_UDP
+// socktype SOCK_STREAM, SOCK_DGRAM
+// host the destination hostname or IP address (IPv4 or IPv6) to connect to
+// if it resolves to many IPs, all are tried (IPv4 and IPv6)
+// scope_id the if_index id of the interface to use for connecting (0 = any)
+// (used only under IPv6)
+// service the service name or port to connect to
+// timeout the timeout for establishing a connection
+
+static inline int _connect_to(int protocol, int socktype, const char *host, uint32_t scope_id, const char *service, struct timeval *timeout) {
+ struct addrinfo hints;
+ struct addrinfo *ai_head = NULL, *ai = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC; /* Allow IPv4 or IPv6 */
@@ -103,52 +442,52 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout
char servBfr[NI_MAXSERV + 1];
getnameinfo(ai->ai_addr,
- ai->ai_addrlen,
- hostBfr,
- sizeof(hostBfr),
- servBfr,
- sizeof(servBfr),
- NI_NUMERICHOST | NI_NUMERICSERV);
+ ai->ai_addrlen,
+ hostBfr,
+ sizeof(hostBfr),
+ servBfr,
+ sizeof(servBfr),
+ NI_NUMERICHOST | NI_NUMERICSERV);
debug(D_CONNECT_TO, "Address info: host = '%s', service = '%s', ai_flags = 0x%02X, ai_family = %d (PF_INET = %d, PF_INET6 = %d), ai_socktype = %d (SOCK_STREAM = %d, SOCK_DGRAM = %d), ai_protocol = %d (IPPROTO_TCP = %d, IPPROTO_UDP = %d), ai_addrlen = %lu (sockaddr_in = %lu, sockaddr_in6 = %lu)",
- hostBfr,
- servBfr,
- (unsigned int)ai->ai_flags,
- ai->ai_family,
- PF_INET,
- PF_INET6,
- ai->ai_socktype,
- SOCK_STREAM,
- SOCK_DGRAM,
- ai->ai_protocol,
- IPPROTO_TCP,
- IPPROTO_UDP,
- (unsigned long)ai->ai_addrlen,
- (unsigned long)sizeof(struct sockaddr_in),
- (unsigned long)sizeof(struct sockaddr_in6));
+ hostBfr,
+ servBfr,
+ (unsigned int)ai->ai_flags,
+ ai->ai_family,
+ PF_INET,
+ PF_INET6,
+ ai->ai_socktype,
+ SOCK_STREAM,
+ SOCK_DGRAM,
+ ai->ai_protocol,
+ IPPROTO_TCP,
+ IPPROTO_UDP,
+ (unsigned long)ai->ai_addrlen,
+ (unsigned long)sizeof(struct sockaddr_in),
+ (unsigned long)sizeof(struct sockaddr_in6));
switch (ai->ai_addr->sa_family) {
case PF_INET: {
struct sockaddr_in *pSadrIn = (struct sockaddr_in *)ai->ai_addr;
debug(D_CONNECT_TO, "ai_addr = sin_family: %d (AF_INET = %d, AF_INET6 = %d), sin_addr: '%s', sin_port: '%s'",
- pSadrIn->sin_family,
- AF_INET,
- AF_INET6,
- hostBfr,
- servBfr);
+ pSadrIn->sin_family,
+ AF_INET,
+ AF_INET6,
+ hostBfr,
+ servBfr);
break;
}
case PF_INET6: {
struct sockaddr_in6 *pSadrIn6 = (struct sockaddr_in6 *) ai->ai_addr;
debug(D_CONNECT_TO,"ai_addr = sin6_family: %d (AF_INET = %d, AF_INET6 = %d), sin6_addr: '%s', sin6_port: '%s', sin6_flowinfo: %u, sin6_scope_id: %u",
- pSadrIn6->sin6_family,
- AF_INET,
- AF_INET6,
- hostBfr,
- servBfr,
- pSadrIn6->sin6_flowinfo,
- pSadrIn6->sin6_scope_id);
+ pSadrIn6->sin6_family,
+ AF_INET,
+ AF_INET6,
+ hostBfr,
+ servBfr,
+ pSadrIn6->sin6_flowinfo,
+ pSadrIn6->sin6_scope_id);
break;
}
@@ -180,6 +519,85 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout
return fd;
}
+// connect_to()
+//
+// definition format:
+//
+// [PROTOCOL:]IP[%INTERFACE][:PORT]
+//
+// PROTOCOL = tcp or udp
+// IP = IPv4 or IPv6 IP or hostname, optionally enclosed in [] (required for IPv6)
+// INTERFACE = for IPv6 only, the network interface to use
+// PORT = port number or service name
+
+int connect_to(const char *definition, int default_port, struct timeval *timeout) {
+ char buffer[strlen(definition) + 1];
+ strcpy(buffer, definition);
+
+ char default_service[10 + 1];
+ snprintfz(default_service, 10, "%d", default_port);
+
+ char *host = buffer, *service = default_service, *interface = "";
+ int protocol = IPPROTO_TCP, socktype = SOCK_STREAM;
+ uint32_t scope_id = 0;
+
+ if(strncmp(host, "tcp:", 4) == 0) {
+ host += 4;
+ protocol = IPPROTO_TCP;
+ socktype = SOCK_STREAM;
+ }
+ else if(strncmp(host, "udp:", 4) == 0) {
+ host += 4;
+ protocol = IPPROTO_UDP;
+ socktype = SOCK_DGRAM;
+ }
+
+ char *e = host;
+ if(*e == '[') {
+ e = ++host;
+ while(*e && *e != ']') e++;
+ if(*e == ']') {
+ *e = '\0';
+ e++;
+ }
+ }
+ else {
+ while(*e && *e != ':' && *e != '%') e++;
+ }
+
+ if(*e == '%') {
+ *e = '\0';
+ e++;
+ interface = e;
+ while(*e && *e != ':') e++;
+ }
+
+ if(*e == ':') {
+ *e = '\0';
+ e++;
+ service = e;
+ }
+
+ debug(D_CONNECT_TO, "Attempting connection to host = '%s', service = '%s', interface = '%s', protocol = %d (tcp = %d, udp = %d)", host, service, interface, protocol, IPPROTO_TCP, IPPROTO_UDP);
+
+ if(!*host) {
+ error("Definition '%s' does not specify a host.", definition);
+ return -1;
+ }
+
+ if(*interface) {
+ scope_id = if_nametoindex(interface);
+ if(!scope_id)
+ error("Cannot find a network interface named '%s'. Continuing with limiting the network interface", interface);
+ }
+
+ if(!*service)
+ service = default_service;
+
+
+ return _connect_to(protocol, socktype, host, scope_id, service, timeout);
+}
+
int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
int sock = -1;
@@ -213,6 +631,10 @@ int connect_to_one_of(const char *destination, int default_port, struct timeval
return sock;
}
+
+// --------------------------------------------------------------------------------------------------------------------
+// helpers to send/receive data in one call, in blocking mode, with a timeout
+
ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) {
for(;;) {
struct pollfd fd = {
@@ -274,3 +696,455 @@ ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout)
return send(sockfd, buf, len, flags);
}
+
+
+// --------------------------------------------------------------------------------------------------------------------
+// accept4() replacement for systems that do not have one
+
+#ifndef HAVE_ACCEPT4
+int accept4(int sock, struct sockaddr *addr, socklen_t *addrlen, int flags) {
+ int fd = accept(sock, addr, addrlen);
+ int newflags = 0;
+
+ if (fd < 0) return fd;
+
+ if (flags & SOCK_NONBLOCK) {
+ newflags |= O_NONBLOCK;
+ flags &= ~SOCK_NONBLOCK;
+ }
+
+#ifdef SOCK_CLOEXEC
+#ifdef O_CLOEXEC
+ if (flags & SOCK_CLOEXEC) {
+ newflags |= O_CLOEXEC;
+ flags &= ~SOCK_CLOEXEC;
+ }
+#endif
+#endif
+
+ if (flags) {
+ errno = -EINVAL;
+ return -1;
+ }
+
+ if (fcntl(fd, F_SETFL, newflags) < 0) {
+ int saved_errno = errno;
+ close(fd);
+ errno = saved_errno;
+ return -1;
+ }
+
+ return fd;
+}
+#endif
+
+
+// --------------------------------------------------------------------------------------------------------------------
+// accept_socket() - accept a socket and store client IP and port
+
+int accept_socket(int fd, int flags, char *client_ip, size_t ipsize, char *client_port, size_t portsize) {
+ struct sockaddr_storage sadr;
+ socklen_t addrlen = sizeof(sadr);
+
+ int nfd = accept4(fd, (struct sockaddr *)&sadr, &addrlen, flags);
+ if (nfd >= 0) {
+ if (getnameinfo((struct sockaddr *)&sadr, addrlen, client_ip, (socklen_t)ipsize, client_port, (socklen_t)portsize, NI_NUMERICHOST | NI_NUMERICSERV) != 0) {
+ error("LISTENER: cannot getnameinfo() on received client connection.");
+ strncpyz(client_ip, "UNKNOWN", ipsize - 1);
+ strncpyz(client_port, "UNKNOWN", portsize - 1);
+ }
+
+ client_ip[ipsize - 1] = '\0';
+ client_port[portsize - 1] = '\0';
+
+ switch (((struct sockaddr *)&sadr)->sa_family) {
+ case AF_INET:
+ debug(D_LISTENER, "New IPv4 web client from %s port %s on socket %d.", client_ip, client_port, fd);
+ break;
+
+ case AF_INET6:
+ if (strncmp(client_ip, "::ffff:", 7) == 0) {
+ memmove(client_ip, &client_ip[7], strlen(&client_ip[7]) + 1);
+ debug(D_LISTENER, "New IPv4 web client from %s port %s on socket %d.", client_ip, client_port, fd);
+ } else
+ debug(D_LISTENER, "New IPv6 web client from %s port %s on socket %d.", client_ip, client_port, fd);
+ break;
+
+ default:
+ debug(D_LISTENER, "New UNKNOWN web client from %s port %s on socket %d.", client_ip, client_port, fd);
+ break;
+ }
+ }
+
+ return nfd;
+}
+
+
+// --------------------------------------------------------------------------------------------------------------------
+// poll() based listener
+// this should be the fastest possible listener for up to 100 sockets
+// above 100, an epoll() interface is needed on Linux
+
+#define POLL_FDS_INCREASE_STEP 10
+
+#define POLLINFO_FLAG_SERVER_SOCKET 0x00000001
+#define POLLINFO_FLAG_CLIENT_SOCKET 0x00000002
+
+struct pollinfo {
+ size_t slot;
+ char *client;
+ struct pollinfo *next;
+ uint32_t flags;
+ int socktype;
+
+ void *data;
+};
+
+struct poll {
+ size_t slots;
+ size_t used;
+ size_t min;
+ size_t max;
+ struct pollfd *fds;
+ struct pollinfo *inf;
+ struct pollinfo *first_free;
+
+ void *(*add_callback)(int fd, short int *events);
+ void (*del_callback)(int fd, void *data);
+ int (*rcv_callback)(int fd, int socktype, void *data, short int *events);
+ int (*snd_callback)(int fd, int socktype, void *data, short int *events);
+};
+
+static inline struct pollinfo *poll_add_fd(struct poll *p, int fd, int socktype, short int events, uint32_t flags) {
+ debug(D_POLLFD, "POLLFD: ADD: request to add fd %d, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", fd, p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
+
+ if(unlikely(fd < 0)) return NULL;
+
+ if(unlikely(!p->first_free)) {
+ size_t new_slots = p->slots + POLL_FDS_INCREASE_STEP;
+ debug(D_POLLFD, "POLLFD: ADD: increasing size (current = %zu, new = %zu, used = %zu, min = %zu, max = %zu)", p->slots, new_slots, p->used, p->min, p->max);
+
+ p->fds = reallocz(p->fds, sizeof(struct pollfd) * new_slots);
+ p->inf = reallocz(p->inf, sizeof(struct pollinfo) * new_slots);
+
+ ssize_t i;
+ for(i = new_slots - 1; i >= (ssize_t)p->slots ; i--) {
+ debug(D_POLLFD, "POLLFD: ADD: resetting new slot %zd", i);
+ p->fds[i].fd = -1;
+ p->fds[i].events = 0;
+ p->fds[i].revents = 0;
+
+ p->inf[i].slot = (size_t)i;
+ p->inf[i].flags = 0;
+ p->inf[i].socktype = -1;
+ p->inf[i].client = NULL;
+ p->inf[i].data = NULL;
+ p->inf[i].next = p->first_free;
+ p->first_free = &p->inf[i];
+ }
+
+ p->slots = new_slots;
+ }
+
+ struct pollinfo *pi = p->first_free;
+ p->first_free = p->first_free->next;
+
+ debug(D_POLLFD, "POLLFD: ADD: selected slot %zu, next free is %zd", pi->slot, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
+
+ struct pollfd *pf = &p->fds[pi->slot];
+ pf->fd = fd;
+ pf->events = events;
+ pf->revents = 0;
+
+ pi->socktype = socktype;
+ pi->flags = flags;
+ pi->next = NULL;
+
+ p->used++;
+ if(unlikely(pi->slot > p->max))
+ p->max = pi->slot;
+
+ if(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
+ pi->data = p->add_callback(fd, &pf->events);
+ }
+
+ if(pi->flags & POLLINFO_FLAG_SERVER_SOCKET) {
+ p->min = pi->slot;
+ }
+
+ debug(D_POLLFD, "POLLFD: ADD: completed, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
+
+ return pi;
+}
+
+static inline void poll_close_fd(struct poll *p, struct pollinfo *pi) {
+ struct pollfd *pf = &p->fds[pi->slot];
+ debug(D_POLLFD, "POLLFD: DEL: request to clear slot %zu (fd %d), old next free was %zd", pi->slot, pf->fd, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
+
+ if(unlikely(pf->fd == -1)) return;
+
+ if(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
+ p->del_callback(pf->fd, pi->data);
+ }
+
+ close(pf->fd);
+ pf->fd = -1;
+ pf->events = 0;
+ pf->revents = 0;
+
+ pi->socktype = -1;
+ pi->flags = 0;
+ pi->data = NULL;
+
+ freez(pi->client);
+ pi->client = NULL;
+
+ pi->next = p->first_free;
+ p->first_free = pi;
+
+ p->used--;
+ if(p->max == pi->slot) {
+ p->max = p->min;
+ ssize_t i;
+ for(i = (ssize_t)pi->slot; i > (ssize_t)p->min ;i--) {
+ if (unlikely(p->fds[i].fd != -1)) {
+ p->max = (size_t)i;
+ break;
+ }
+ }
+ }
+
+ debug(D_POLLFD, "POLLFD: DEL: completed, slots = %zu, used = %zu, min = %zu, max = %zu, next free = %zd", p->slots, p->used, p->min, p->max, p->first_free?(ssize_t)p->first_free->slot:(ssize_t)-1);
+}
+
+static void *add_callback_default(int fd, short int *events) {
+ (void)fd;
+ (void)events;
+
+ return NULL;
+}
+static void del_callback_default(int fd, void *data) {
+ (void)fd;
+ (void)data;
+
+ if(data)
+ error("POLLFD: internal error: del_callback_default() called with data pointer - possible memory leak");
+}
+
+static int rcv_callback_default(int fd, int socktype, void *data, short int *events) {
+ (void)socktype;
+ (void)data;
+ (void)events;
+
+ char buffer[1024 + 1];
+
+ ssize_t rc;
+ do {
+ rc = recv(fd, buffer, 1024, MSG_DONTWAIT);
+ if (rc < 0) {
+ // read failed
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ error("POLLFD: recv() failed.");
+ return -1;
+ }
+ } else if (rc) {
+ // data received
+ info("POLLFD: internal error: discarding %zd bytes received on socket %d", rc, fd);
+ }
+ } while (rc != -1);
+
+ return 0;
+}
+
+static int snd_callback_default(int fd, int socktype, void *data, short int *events) {
+ (void)socktype;
+ (void)data;
+ (void)events;
+
+ *events &= ~POLLOUT;
+
+ info("POLLFD: internal error: nothing to send on socket %d", fd);
+ return 0;
+}
+
+void poll_events_cleanup(void *data) {
+ struct poll *p = (struct poll *)data;
+
+ size_t i;
+ for(i = 0 ; i <= p->max ; i++) {
+ struct pollinfo *pi = &p->inf[i];
+ poll_close_fd(p, pi);
+ }
+
+ freez(p->fds);
+ freez(p->inf);
+}
+
+void poll_events(LISTEN_SOCKETS *sockets
+ , void *(*add_callback)(int fd, short int *events)
+ , void (*del_callback)(int fd, void *data)
+ , int (*rcv_callback)(int fd, int socktype, void *data, short int *events)
+ , int (*snd_callback)(int fd, int socktype, void *data, short int *events)
+ , void *data
+) {
+ int retval;
+
+ struct poll p = {
+ .slots = 0,
+ .used = 0,
+ .max = 0,
+ .fds = NULL,
+ .inf = NULL,
+ .first_free = NULL,
+
+ .add_callback = add_callback?add_callback:add_callback_default,
+ .del_callback = del_callback?del_callback:del_callback_default,
+ .rcv_callback = rcv_callback?rcv_callback:rcv_callback_default,
+ .snd_callback = snd_callback?snd_callback:snd_callback_default
+ };
+
+ size_t i;
+ for(i = 0; i < sockets->opened ;i++) {
+ struct pollinfo *pi = poll_add_fd(&p, sockets->fds[i], sockets->fds_types[i], POLLIN, POLLINFO_FLAG_SERVER_SOCKET);
+ pi->data = data;
+ info("POLLFD: LISTENER: listening on '%s'", (sockets->fds_names[i])?sockets->fds_names[i]:"UNKNOWN");
+ }
+
+ int timeout = -1; // wait forever
+
+ pthread_cleanup_push(poll_events_cleanup, &p);
+
+ for(;;) {
+ if(unlikely(netdata_exit)) break;
+
+ debug(D_POLLFD, "POLLFD: LISTENER: Waiting on %zu sockets...", p.max + 1);
+ retval = poll(p.fds, p.max + 1, timeout);
+
+ if(unlikely(retval == -1)) {
+ error("POLLFD: LISTENER: poll() failed.");
+ continue;
+ }
+ else if(unlikely(!retval)) {
+ debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout.");
+ continue;
+ }
+
+ if(unlikely(netdata_exit)) break;
+
+ for(i = 0 ; i <= p.max ; i++) {
+ struct pollfd *pf = &p.fds[i];
+ struct pollinfo *pi = &p.inf[i];
+ int fd = pf->fd;
+ short int revents = pf->revents;
+ pf->revents = 0;
+
+ if(unlikely(fd == -1)) {
+ debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i);
+ continue;
+ }
+
+ debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
+
+ if(revents & POLLIN || revents & POLLPRI) {
+ // receiving data
+
+ if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) {
+ // new connection
+ // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd);
+
+ switch(pi->socktype) {
+ case SOCK_STREAM: {
+ // a TCP socket
+ // we accept the connection
+
+ int nfd;
+ do {
+ char client_ip[NI_MAXHOST + 1];
+ char client_port[NI_MAXSERV + 1];
+
+ debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd);
+ nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, NI_MAXHOST + 1, client_port, NI_MAXSERV + 1);
+ if (nfd < 0) {
+ // accept failed
+
+ debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd);
+
+ if(errno != EWOULDBLOCK && errno != EAGAIN)
+ error("POLLFD: LISTENER: accept() failed.");
+
+ break;
+ }
+ else {
+ // accept ok
+ info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s'", client_ip, client_port, sockets->fds_names[i]);
+ poll_add_fd(&p, nfd, SOCK_STREAM, POLLIN, POLLINFO_FLAG_CLIENT_SOCKET);
+
+ // it may have realloced them, so refresh our pointers
+ pf = &p.fds[i];
+ pi = &p.inf[i];
+ }
+ } while (nfd != -1);
+ break;
+ }
+
+ case SOCK_DGRAM: {
+ // a UDP socket
+ // we read data from the server socket
+
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd);
+
+ p.rcv_callback(fd, pi->socktype, pi->data, &pf->events);
+ break;
+ }
+
+ default: {
+ error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot);
+ break;
+ }
+ }
+ }
+
+ if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
+ // read data from client TCP socket
+ debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd);
+
+ if (p.rcv_callback(fd, pi->socktype, pi->data, &pf->events) == -1) {
+ poll_close_fd(&p, pi);
+ continue;
+ }
+ }
+ }
+
+ if(unlikely(revents & POLLOUT)) {
+ // sending data
+ debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd);
+
+ if (p.snd_callback(fd, pi->socktype, pi->data, &pf->events) == -1) {
+ poll_close_fd(&p, pi);
+ continue;
+ }
+ }
+
+ if(unlikely(revents & POLLERR)) {
+ error("POLLFD: LISTENER: processing POLLERR events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
+ poll_close_fd(&p, pi);
+ continue;
+ }
+
+ if(unlikely(revents & POLLHUP)) {
+ error("POLLFD: LISTENER: processing POLLHUP events for slot %zu (events = %d, revents = %d)", i, pf->events, pf->revents);
+ poll_close_fd(&p, pi);
+ continue;
+ }
+
+ if(unlikely(revents & POLLNVAL)) {
+ error("POLLFD: LISTENER: processing POLLNVAP events for slot %zu (events = %d, revents = %d)", i, pf->events, revents);
+ poll_close_fd(&p, pi);
+ continue;
+ }
+ }
+ }
+
+ pthread_cleanup_pop(1);
+ debug(D_POLLFD, "POLLFD: LISTENER: cleanup completed");
+}