From d4dd00f58a502c9ca4b63e36ce6bc7a9945dc63c Mon Sep 17 00:00:00 2001 From: Federico Ceratto Date: Tue, 27 Mar 2018 22:28:21 +0100 Subject: New upstream version 1.10.0+dfsg --- src/web_server.c | 1184 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 979 insertions(+), 205 deletions(-) (limited to 'src/web_server.c') diff --git a/src/web_server.c b/src/web_server.c index d231cbb5c..31c546411 100644 --- a/src/web_server.c +++ b/src/web_server.c @@ -1,50 +1,12 @@ #include "common.h" -static LISTEN_SOCKETS api_sockets = { - .config_section = CONFIG_SECTION_WEB, - .default_bind_to = "*", - .default_port = API_LISTEN_PORT, - .backlog = API_LISTEN_BACKLOG -}; +// this file includes 3 web servers: +// +// 1. single-threaded, based on select() +// 2. multi-threaded, based on poll() that spawns threads to handle the requests, based on select() +// 3. static-threaded, based on poll() using a fixed number of threads (configured at netdata.conf) -WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_MULTI_THREADED; - -#ifdef NETDATA_INTERNAL_CHECKS -static void log_allocations(void) -{ -#ifdef HAVE_C_MALLINFO - static int heap = 0, used = 0, mmap = 0; - - struct mallinfo mi; - - mi = mallinfo(); - if(mi.uordblks > used) { - int clients = 0; - struct web_client *w; - for(w = web_clients; w ; w = w->next) clients++; - - info("Allocated memory: used %d KB (+%d B), mmap %d KB (+%d B), heap %d KB (+%d B). %d web clients connected.", - mi.uordblks / 1024, - mi.uordblks - used, - mi.hblkhd / 1024, - mi.hblkhd - mmap, - mi.arena / 1024, - mi.arena - heap, - clients); - - used = mi.uordblks; - heap = mi.arena; - mmap = mi.hblkhd; - } -#else /* ! HAVE_C_MALLINFO */ - ; -#endif /* ! HAVE_C_MALLINFO */ - -#ifdef has_jemalloc - malloc_stats_print(NULL, NULL, NULL); -#endif -} -#endif /* NETDATA_INTERNAL_CHECKS */ +WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; // -------------------------------------------------------------------------------------- @@ -53,6 +15,8 @@ WEB_SERVER_MODE web_server_mode_id(const char *mode) { return WEB_SERVER_MODE_NONE; else if(!strcmp(mode, "single") || !strcmp(mode, "single-threaded")) return WEB_SERVER_MODE_SINGLE_THREADED; + else if(!strcmp(mode, "static") || !strcmp(mode, "static-threaded")) + return WEB_SERVER_MODE_STATIC_THREADED; else // if(!strcmp(mode, "multi") || !strcmp(mode, "multi-threaded")) return WEB_SERVER_MODE_MULTI_THREADED; } @@ -65,6 +29,9 @@ const char *web_server_mode_name(WEB_SERVER_MODE id) { case WEB_SERVER_MODE_SINGLE_THREADED: return "single-threaded"; + case WEB_SERVER_MODE_STATIC_THREADED: + return "static-threaded"; + default: case WEB_SERVER_MODE_MULTI_THREADED: return "multi-threaded"; @@ -72,6 +39,14 @@ const char *web_server_mode_name(WEB_SERVER_MODE id) { } // -------------------------------------------------------------------------------------- +// API sockets + +static LISTEN_SOCKETS api_sockets = { + .config_section = CONFIG_SECTION_WEB, + .default_bind_to = "*", + .default_port = API_LISTEN_PORT, + .backlog = API_LISTEN_BACKLOG +}; int api_listen_sockets_setup(void) { int socks = listen_sockets_setup(&api_sockets); @@ -82,89 +57,622 @@ int api_listen_sockets_setup(void) { return socks; } + // -------------------------------------------------------------------------------------- -// the main socket listener +// access lists -static inline void cleanup_web_clients(void) { - struct web_client *w; +SIMPLE_PATTERN *web_allow_connections_from = NULL; +SIMPLE_PATTERN *web_allow_streaming_from = NULL; +SIMPLE_PATTERN *web_allow_netdataconf_from = NULL; + +// WEB_CLIENT_ACL +SIMPLE_PATTERN *web_allow_dashboard_from = NULL; +SIMPLE_PATTERN *web_allow_registry_from = NULL; +SIMPLE_PATTERN *web_allow_badges_from = NULL; + +static void web_client_update_acl_matches(struct web_client *w) { + w->acl = WEB_CLIENT_ACL_NONE; + + if(!web_allow_dashboard_from || simple_pattern_matches(web_allow_dashboard_from, w->client_ip)) + w->acl |= WEB_CLIENT_ACL_DASHBOARD; + + if(!web_allow_registry_from || simple_pattern_matches(web_allow_registry_from, w->client_ip)) + w->acl |= WEB_CLIENT_ACL_REGISTRY; + + if(!web_allow_badges_from || simple_pattern_matches(web_allow_badges_from, w->client_ip)) + w->acl |= WEB_CLIENT_ACL_BADGE; +} + + +// -------------------------------------------------------------------------------------- + +static void log_connection(struct web_client *w, const char *msg) { + log_access("%llu: %d '[%s]:%s' '%s'", w->id, gettid(), w->client_ip, w->client_port, msg); +} + +// ---------------------------------------------------------------------------- +// allocate and free web_clients + +static void web_client_zero(struct web_client *w) { + // zero everything about it - but keep the buffers + + // remember the pointers to the buffers + BUFFER *b1 = w->response.data; + BUFFER *b2 = w->response.header; + BUFFER *b3 = w->response.header_output; + + // empty the buffers + buffer_flush(b1); + buffer_flush(b2); + buffer_flush(b3); + + freez(w->user_agent); + + // zero everything + memset(w, 0, sizeof(struct web_client)); + + // restore the pointers of the buffers + w->response.data = b1; + w->response.header = b2; + w->response.header_output = b3; +} + +static void web_client_free(struct web_client *w) { + buffer_free(w->response.header_output); + buffer_free(w->response.header); + buffer_free(w->response.data); + freez(w->user_agent); + freez(w); +} + +static struct web_client *web_client_alloc(void) { + struct web_client *w = callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + return w; +} + +// ---------------------------------------------------------------------------- +// web clients caching + +// When clients connect and disconnect, avoid allocating and releasing memory. +// Instead, when new clients get connected, reuse any memory previously allocated +// for serving web clients that are now disconnected. + +// The size of the cache is adaptive. It caches the structures of 2x +// the number of currently connected clients. + +// Comments per server: +// SINGLE-THREADED : 1 cache is maintained +// MULTI-THREADED : 1 cache is maintained +// STATIC-THREADED : 1 cache for each thred of the web server + +struct clients_cache { + pid_t pid; + + struct web_client *used; // the structures of the currently connected clients + size_t used_count; // the count the currently connected clients + + struct web_client *avail; // the cached structures, available for future clients + size_t avail_count; // the number of cached structures + + size_t reused; // the number of re-uses + size_t allocated; // the number of allocations +}; + +static __thread struct clients_cache web_clients_cache = { + .pid = 0, + .used = NULL, + .used_count = 0, + .avail = NULL, + .avail_count = 0, + .allocated = 0, + .reused = 0 +}; + +static inline void web_client_cache_verify(int force) { +#ifdef NETDATA_INTERNAL_CHECKS + static __thread size_t count = 0; + count++; + + if(unlikely(force || count > 1000)) { + count = 0; + + struct web_client *w; + size_t used = 0, avail = 0; + for(w = web_clients_cache.used; w ; w = w->next) used++; + for(w = web_clients_cache.avail; w ; w = w->next) avail++; + + info("web_client_cache has %zu (%zu) used and %zu (%zu) available clients, allocated %zu, reused %zu (hit %zu%%)." + , used, web_clients_cache.used_count + , avail, web_clients_cache.avail_count + , web_clients_cache.allocated + , web_clients_cache.reused + , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0 + ); + } +#else + if(unlikely(force)) { + info("web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)." + , web_clients_cache.used_count + , web_clients_cache.avail_count + , web_clients_cache.allocated + , web_clients_cache.reused + , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0 + ); + } +#endif +} + +// destroy the cache and free all the memory it uses +static void web_client_cache_destroy(void) { +#ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) + error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); + + web_client_cache_verify(1); +#endif + + netdata_thread_disable_cancelability(); + + struct web_client *w, *t; + + w = web_clients_cache.used; + while(w) { + t = w; + w = w->next; + web_client_free(t); + } + web_clients_cache.used = NULL; + web_clients_cache.used_count = 0; + + w = web_clients_cache.avail; + while(w) { + t = w; + w = w->next; + web_client_free(t); + } + web_clients_cache.avail = NULL; + web_clients_cache.avail_count = 0; + + netdata_thread_enable_cancelability(); +} + +static struct web_client *web_client_get_from_cache_or_allocate() { - for (w = web_clients; w;) { - if (web_client_check_obsolete(w)) { - debug(D_WEB_CLIENT, "%llu: Removing client.", w->id); - // pthread_cancel(w->thread); - // pthread_join(w->thread, NULL); - w = web_client_free(w); #ifdef NETDATA_INTERNAL_CHECKS - log_allocations(); + if(unlikely(web_clients_cache.pid == 0)) + web_clients_cache.pid = gettid(); + + if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) + error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); #endif + + netdata_thread_disable_cancelability(); + + struct web_client *w = web_clients_cache.avail; + + if(w) { + // get it from avail + if (w == web_clients_cache.avail) web_clients_cache.avail = w->next; + if(w->prev) w->prev->next = w->next; + if(w->next) w->next->prev = w->prev; + web_clients_cache.avail_count--; + web_client_zero(w); + web_clients_cache.reused++; + } + else { + // allocate it + w = web_client_alloc(); + web_clients_cache.allocated++; + } + + // link it to used web clients + if (web_clients_cache.used) web_clients_cache.used->prev = w; + w->next = web_clients_cache.used; + w->prev = NULL; + web_clients_cache.used = w; + web_clients_cache.used_count++; + + // initialize it + w->id = web_client_connected(); + w->mode = WEB_CLIENT_MODE_NORMAL; + + netdata_thread_enable_cancelability(); + + return w; +} + +static void web_client_release(struct web_client *w) { +#ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) + error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); + + if(unlikely(w->running)) + error("%llu: releasing web client from %s port %s, but it still running.", w->id, w->client_ip, w->client_port); +#endif + + debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port); + + log_connection(w, "DISCONNECTED"); + web_client_request_done(w); + web_client_disconnected(); + + netdata_thread_disable_cancelability(); + + if(web_server_mode != WEB_SERVER_MODE_STATIC_THREADED) { + if (w->ifd != -1) close(w->ifd); + if (w->ofd != -1 && w->ofd != w->ifd) close(w->ofd); + w->ifd = w->ofd = -1; + } + + // unlink it from the used + if (w == web_clients_cache.used) web_clients_cache.used = w->next; + if(w->prev) w->prev->next = w->next; + if(w->next) w->next->prev = w->prev; + web_clients_cache.used_count--; + + if(web_clients_cache.avail_count >= 2 * web_clients_cache.used_count) { + // we have too many of them - free it + web_client_free(w); + } + else { + // link it to the avail + if (web_clients_cache.avail) web_clients_cache.avail->prev = w; + w->next = web_clients_cache.avail; + w->prev = NULL; + web_clients_cache.avail = w; + web_clients_cache.avail_count++; + } + + netdata_thread_enable_cancelability(); +} + + +// ---------------------------------------------------------------------------- +// high level web clients connection management + +static void web_client_initialize_connection(struct web_client *w) { + int flag = 1; + if(setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0) + error("%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd); + + flag = 1; + if(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0) + error("%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd); + + web_client_update_acl_matches(w); + + w->origin[0] = '*'; w->origin[1] = '\0'; + w->cookie1[0] = '\0'; w->cookie2[0] = '\0'; + freez(w->user_agent); w->user_agent = NULL; + + web_client_enable_wait_receive(w); + + log_connection(w, "CONNECTED"); + + web_client_cache_verify(0); +} + +static struct web_client *web_client_create_on_fd(int fd, const char *client_ip, const char *client_port) { + struct web_client *w; + + w = web_client_get_from_cache_or_allocate(); + w->ifd = w->ofd = fd; + + strncpyz(w->client_ip, client_ip, sizeof(w->client_ip) - 1); + strncpyz(w->client_port, client_port, sizeof(w->client_port) - 1); + + if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-"); + if(unlikely(!*w->client_port)) strcpy(w->client_port, "-"); + + web_client_initialize_connection(w); + return(w); +} + +static struct web_client *web_client_create_on_listenfd(int listener) { + struct web_client *w; + + w = web_client_get_from_cache_or_allocate(); + w->ifd = w->ofd = accept_socket(listener, SOCK_NONBLOCK, w->client_ip, sizeof(w->client_ip), w->client_port, sizeof(w->client_port), web_allow_connections_from); + + if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-"); + if(unlikely(!*w->client_port)) strcpy(w->client_port, "-"); + + if (w->ifd == -1) { + if(errno == EPERM) + log_connection(w, "ACCESS DENIED"); + else { + log_connection(w, "CONNECTION FAILED"); + error("%llu: Failed to accept new incoming connection.", w->id); } - else w = w->next; + + web_client_release(w); + return NULL; } + + web_client_initialize_connection(w); + return(w); +} + + +// -------------------------------------------------------------------------------------- +// the thread of a single client - for the MULTI-THREADED web server + +// 1. waits for input and output, using async I/O +// 2. it processes HTTP requests +// 3. it generates HTTP responses +// 4. it copies data from input to output if mode is FILECOPY + +int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS; +int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST; + +static void multi_threaded_web_client_worker_main_cleanup(void *ptr) { + struct web_client *w = ptr; + WEB_CLIENT_IS_DEAD(w); + w->running = 0; +} + +static void *multi_threaded_web_client_worker_main(void *ptr) { + netdata_thread_cleanup_push(multi_threaded_web_client_worker_main_cleanup, ptr); + + struct web_client *w = ptr; + w->running = 1; + + struct pollfd fds[2], *ifd, *ofd; + int retval, timeout_ms; + nfds_t fdmax = 0; + + while(!netdata_exit) { + if(unlikely(web_client_check_dead(w))) { + debug(D_WEB_CLIENT, "%llu: client is dead.", w->id); + break; + } + else if(unlikely(!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))) { + debug(D_WEB_CLIENT, "%llu: client is not set for neither receiving nor sending data.", w->id); + break; + } + + if(unlikely(w->ifd < 0 || w->ofd < 0)) { + error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd", w->id, w->ifd, w->ofd); + break; + } + + if(w->ifd == w->ofd) { + fds[0].fd = w->ifd; + fds[0].events = 0; + fds[0].revents = 0; + + if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN; + if(web_client_has_wait_send(w)) fds[0].events |= POLLOUT; + + fds[1].fd = -1; + fds[1].events = 0; + fds[1].revents = 0; + + ifd = ofd = &fds[0]; + + fdmax = 1; + } + else { + fds[0].fd = w->ifd; + fds[0].events = 0; + fds[0].revents = 0; + if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN; + ifd = &fds[0]; + + fds[1].fd = w->ofd; + fds[1].events = 0; + fds[1].revents = 0; + if(web_client_has_wait_send(w)) fds[1].events |= POLLOUT; + ofd = &fds[1]; + + fdmax = 2; + } + + debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":""); + errno = 0; + timeout_ms = web_client_timeout * 1000; + retval = poll(fds, fdmax, timeout_ms); + + if(unlikely(netdata_exit)) break; + + if(unlikely(retval == -1)) { + if(errno == EAGAIN || errno == EINTR) { + debug(D_WEB_CLIENT, "%llu: EAGAIN received.", w->id); + continue; + } + + debug(D_WEB_CLIENT, "%llu: LISTENER: poll() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd); + break; + } + else if(unlikely(!retval)) { + debug(D_WEB_CLIENT, "%llu: Timeout while waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":""); + break; + } + + if(unlikely(netdata_exit)) break; + + int used = 0; + if(web_client_has_wait_send(w) && ofd->revents & POLLOUT) { + used++; + if(web_client_send(w) < 0) { + debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id); + break; + } + } + + if(unlikely(netdata_exit)) break; + + if(web_client_has_wait_receive(w) && (ifd->revents & POLLIN || ifd->revents & POLLPRI)) { + used++; + if(web_client_receive(w) < 0) { + debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id); + break; + } + + if(w->mode == WEB_CLIENT_MODE_NORMAL) { + debug(D_WEB_CLIENT, "%llu: Attempting to process received data.", w->id); + web_client_process_request(w); + + // if the sockets are closed, may have transferred this client + // to plugins.d + if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM)) + break; + } + } + + if(unlikely(!used)) { + debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on socket.", w->id); + break; + } + } + + if(w->mode != WEB_CLIENT_MODE_STREAM) + log_connection(w, "DISCONNECTED"); + + web_client_request_done(w); + + debug(D_WEB_CLIENT, "%llu: done...", w->id); + + // close the sockets/files now + // to free file descriptors + if(w->ifd == w->ofd) { + if(w->ifd != -1) close(w->ifd); + } + else { + if(w->ifd != -1) close(w->ifd); + if(w->ofd != -1) close(w->ofd); + } + w->ifd = -1; + w->ofd = -1; + + netdata_thread_cleanup_pop(1); + return NULL; } +// -------------------------------------------------------------------------------------- +// the main socket listener - MULTI-THREADED + // 1. it accepts new incoming requests on our port // 2. creates a new web_client for each connection received -// 3. spawns a new pthread to serve the client (this is optimal for keep-alive clients) -// 4. cleans up old web_clients that their pthreads have been exited +// 3. spawns a new netdata_thread to serve the client (this is optimal for keep-alive clients) +// 4. cleans up old web_clients that their netdata_threads have been exited + +static void web_client_multi_threaded_web_server_release_clients(void) { + struct web_client *w; + for(w = web_clients_cache.used; w ; ) { + if(unlikely(!w->running && web_client_check_dead(w))) { + struct web_client *t = w->next; + web_client_release(w); + w = t; + } + else + w = w->next; + } +} + +static void web_client_multi_threaded_web_server_stop_all_threads(void) { + struct web_client *w; + + int found = 1, max = 2 * USEC_PER_SEC, step = 50000; + for(w = web_clients_cache.used; w ; w = w->next) { + if(w->running) { + found++; + info("stopping web client %s, id %llu", w->client_ip, w->id); + netdata_thread_cancel(w->thread); + } + } + + while(found && max > 0) { + max -= step; + info("Waiting %d web threads to finish...", found); + sleep_usec(step); + found = 0; + for(w = web_clients_cache.used; w ; w = w->next) + if(w->running) found++; + } + + if(found) + error("%d web threads are taking too long to finish. Giving up.", found); +} + +static struct pollfd *socket_listen_main_multi_threaded_fds = NULL; + +static void socket_listen_main_multi_threaded_cleanup(void *data) { + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); -#define CLEANUP_EVERY_EVENTS 100 + info("releasing allocated memory..."); + freez(socket_listen_main_multi_threaded_fds); + info("closing all sockets..."); + listen_sockets_close(&api_sockets); + + info("stopping all running web server threads..."); + web_client_multi_threaded_web_server_stop_all_threads(); + + info("freeing web clients cache..."); + web_client_cache_destroy(); + + info("cleanup completed."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +#define CLEANUP_EVERY_EVENTS 60 void *socket_listen_main_multi_threaded(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + netdata_thread_cleanup_push(socket_listen_main_multi_threaded_cleanup, ptr); web_server_mode = WEB_SERVER_MODE_MULTI_THREADED; - info("Multi-threaded WEB SERVER thread created with task id %d", gettid()); + web_server_is_multithreaded = 1; struct web_client *w; int retval, counter = 0; - if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("Cannot set pthread cancel type to DEFERRED."); - - if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("Cannot set pthread cancel state to ENABLE."); - if(!api_sockets.opened) fatal("LISTENER: No sockets to listen to."); - struct pollfd *fds = callocz(sizeof(struct pollfd), api_sockets.opened); + socket_listen_main_multi_threaded_fds = callocz(sizeof(struct pollfd), api_sockets.opened); size_t i; for(i = 0; i < api_sockets.opened ;i++) { - fds[i].fd = api_sockets.fds[i]; - fds[i].events = POLLIN; - fds[i].revents = 0; + socket_listen_main_multi_threaded_fds[i].fd = api_sockets.fds[i]; + socket_listen_main_multi_threaded_fds[i].events = POLLIN; + socket_listen_main_multi_threaded_fds[i].revents = 0; info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN"); } - int timeout = 10 * 1000; + int timeout_ms = 1 * 1000; + + while(!netdata_exit) { - for(;;) { // debug(D_WEB_CLIENT, "LISTENER: Waiting..."); - retval = poll(fds, api_sockets.opened, timeout); + retval = poll(socket_listen_main_multi_threaded_fds, api_sockets.opened, timeout_ms); if(unlikely(retval == -1)) { error("LISTENER: poll() failed."); continue; } else if(unlikely(!retval)) { - debug(D_WEB_CLIENT, "LISTENER: select() timeout."); - counter = 0; - cleanup_web_clients(); + debug(D_WEB_CLIENT, "LISTENER: poll() timeout."); + counter++; continue; } for(i = 0 ; i < api_sockets.opened ; i++) { - short int revents = fds[i].revents; + short int revents = socket_listen_main_multi_threaded_fds[i].revents; // check for new incoming connections if(revents & POLLIN || revents & POLLPRI) { - fds[i].revents = 0; + socket_listen_main_multi_threaded_fds[i].revents = 0; - w = web_client_create(fds[i].fd); + w = web_client_create_on_listenfd(socket_listen_main_multi_threaded_fds[i].fd); if(unlikely(!w)) { - // no need for error log - web_client_create already logged the error + // no need for error log - web_client_create_on_listenfd already logged the error continue; } @@ -173,40 +681,38 @@ void *socket_listen_main_multi_threaded(void *ptr) { else web_client_set_tcp(w); - if(pthread_create(&w->thread, NULL, web_client_main, w) != 0) { - error("%llu: failed to create new thread for web client.", w->id); - WEB_CLIENT_IS_OBSOLETE(w); - } - else if(pthread_detach(w->thread) != 0) { - error("%llu: Cannot request detach of newly created web client thread.", w->id); - WEB_CLIENT_IS_OBSOLETE(w); + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "WEB_CLIENT[%llu,[%s]:%s]", w->id, w->client_ip, w->client_port); + + w->running = 1; + if(netdata_thread_create(&w->thread, tag, NETDATA_THREAD_OPTION_DONT_LOG, multi_threaded_web_client_worker_main, w) != 0) { + w->running = 0; + web_client_release(w); } } } - // cleanup unused clients counter++; - if(counter >= CLEANUP_EVERY_EVENTS) { + if(counter > CLEANUP_EVERY_EVENTS) { counter = 0; - cleanup_web_clients(); + web_client_multi_threaded_web_server_release_clients(); } } - debug(D_WEB_CLIENT, "LISTENER: exit!"); - listen_sockets_close(&api_sockets); - - freez(fds); - - static_thread->enabled = 0; - pthread_exit(NULL); + netdata_thread_cleanup_pop(1); return NULL; } + +// -------------------------------------------------------------------------------------- +// the main socket listener - SINGLE-THREADED + struct web_client *single_threaded_clients[FD_SETSIZE]; static inline int single_threaded_link_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds, int *max) { - if(unlikely(web_client_check_obsolete(w) || web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) + if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) { return 1; + } if(unlikely(w->ifd < 0 || w->ifd >= (int)FD_SETSIZE || w->ofd < 0 || w->ofd >= (int)FD_SETSIZE)) { error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, (int)FD_SETSIZE); @@ -240,27 +746,33 @@ static inline int single_threaded_unlink_client(struct web_client *w, fd_set *if single_threaded_clients[w->ifd] = NULL; single_threaded_clients[w->ofd] = NULL; - if(unlikely(web_client_check_obsolete(w) || web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) + if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) { return 1; + } return 0; } -void *socket_listen_main_single_threaded(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; +static void socket_listen_main_single_threaded_cleanup(void *data) { + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED; + info("closing all sockets..."); + listen_sockets_close(&api_sockets); - info("Single-threaded WEB SERVER thread created with task id %d", gettid()); + info("freeing web clients cache..."); + web_client_cache_destroy(); - struct web_client *w; - int retval; + info("cleanup completed."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} - if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("Cannot set pthread cancel type to DEFERRED."); +void *socket_listen_main_single_threaded(void *ptr) { + netdata_thread_cleanup_push(socket_listen_main_single_threaded_cleanup, ptr); + web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED; + web_server_is_multithreaded = 0; - if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("Cannot set pthread cancel state to ENABLE."); + struct web_client *w; if(!api_sockets.opened) fatal("LISTENER: no listen sockets available."); @@ -287,14 +799,14 @@ void *socket_listen_main_single_threaded(void *ptr) { fdmax = api_sockets.fds[i]; } - for(;;) { + while(!netdata_exit) { debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server waiting (fdmax = %d)...", fdmax); struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; rifds = ifds; rofds = ofds; refds = efds; - retval = select(fdmax+1, &rifds, &rofds, &refds, &tv); + int retval = select(fdmax+1, &rifds, &rofds, &refds, &tv); if(unlikely(retval == -1)) { error("LISTENER: select() failed."); @@ -306,7 +818,9 @@ void *socket_listen_main_single_threaded(void *ptr) { for(i = 0; i < api_sockets.opened ; i++) { if (FD_ISSET(api_sockets.fds[i], &rifds)) { debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection."); - w = web_client_create(api_sockets.fds[i]); + w = web_client_create_on_listenfd(api_sockets.fds[i]); + if(unlikely(!w)) + continue; if(api_sockets.fds_families[i] == AF_UNIX) web_client_set_unix(w); @@ -314,7 +828,7 @@ void *socket_listen_main_single_threaded(void *ptr) { web_client_set_tcp(w); if (single_threaded_link_client(w, &ifds, &ofds, &ifds, &fdmax) != 0) { - web_client_free(w); + web_client_release(w); } } } @@ -324,22 +838,27 @@ void *socket_listen_main_single_threaded(void *ptr) { continue; w = single_threaded_clients[i]; - if(unlikely(!w)) + if(unlikely(!w)) { + // error("no client on slot %zu", i); continue; + } if(unlikely(single_threaded_unlink_client(w, &ifds, &ofds, &efds) != 0)) { - web_client_free(w); + // error("failed to unlink client %zu", i); + web_client_release(w); continue; } if (unlikely(FD_ISSET(w->ifd, &refds) || FD_ISSET(w->ofd, &refds))) { - web_client_free(w); + // error("no input on client %zu", i); + web_client_release(w); continue; } if (unlikely(web_client_has_wait_receive(w) && FD_ISSET(w->ifd, &rifds))) { if (unlikely(web_client_receive(w) < 0)) { - web_client_free(w); + // error("cannot read from client %zu", i); + web_client_release(w); continue; } @@ -351,122 +870,243 @@ void *socket_listen_main_single_threaded(void *ptr) { if (unlikely(web_client_has_wait_send(w) && FD_ISSET(w->ofd, &rofds))) { if (unlikely(web_client_send(w) < 0)) { + // error("cannot send data to client %zu", i); debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id); - web_client_free(w); + web_client_release(w); continue; } } if(unlikely(single_threaded_link_client(w, &ifds, &ofds, &efds, &fdmax) != 0)) { - web_client_free(w); + // error("failed to link client %zu", i); + web_client_release(w); } } } else { debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server timeout."); -#ifdef NETDATA_INTERNAL_CHECKS - log_allocations(); -#endif } } - debug(D_WEB_CLIENT, "LISTENER: exit!"); - listen_sockets_close(&api_sockets); - - static_thread->enabled = 0; - pthread_exit(NULL); + netdata_thread_cleanup_pop(1); return NULL; } -#if 0 -// new TCP client connected -static void *web_server_add_callback(int fd, int socktype, short int *events) { - (void)fd; - (void)socktype; +// -------------------------------------------------------------------------------------- +// the main socket listener - STATIC-THREADED + +struct web_server_static_threaded_worker { + netdata_thread_t thread; + + int id; + int running; + + size_t max_sockets; + + volatile size_t connected; + volatile size_t disconnected; + volatile size_t receptions; + volatile size_t sends; + volatile size_t max_concurrent; + + volatile size_t files_read; + volatile size_t file_reads; +}; + +static long long static_threaded_workers_count = 1; +static struct web_server_static_threaded_worker *static_workers_private_data = NULL; +static __thread struct web_server_static_threaded_worker *worker_private = NULL; + +// ---------------------------------------------------------------------------- + +static inline int web_server_check_client_status(struct web_client *w) { + if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) + return -1; + + return 0; +} + +// ---------------------------------------------------------------------------- +// web server files + +static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) { + struct web_client *w = (struct web_client *)data; + + worker_private->files_read++; + debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd); *events = POLLIN; + pi->data = w; + return w; +} + +static void web_werver_file_del_callback(POLLINFO *pi) { + struct web_client *w = (struct web_client *)pi->data; + debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd); + + w->pollinfo_filecopy_slot = 0; + + if(unlikely(!w->pollinfo_slot)) { + debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd); + web_client_release(w); + } +} + +static int web_server_file_read_callback(POLLINFO *pi, short int *events) { + struct web_client *w = (struct web_client *)pi->data; + + // if there is no POLLINFO linked to this, it means the client disconnected + // stop the file reading too + if(unlikely(!w->pollinfo_slot)) { + debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd); + return -1; + } + + if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) { + debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd); + return -1; + } + + debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd); - debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", fd); - struct web_client *w = web_client_create(fd); + worker_private->file_reads++; + ssize_t ret = unlikely(web_client_read_file(w)); - if(unlikely(socktype == AF_UNIX)) + if(likely(web_client_has_wait_send(w))) { + POLLJOB *p = pi->p; // our POLLJOB + POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket + + debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd); + p->fds[wpi->slot].events |= POLLOUT; + } + + if(unlikely(ret <= 0 || w->ifd == w->ofd)) { + debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd); + return -1; + } + + *events = POLLIN; + return 0; +} + +static int web_server_file_write_callback(POLLINFO *pi, short int *events) { + (void)pi; + (void)events; + + error("Writing to web files is not supported!"); + + return -1; +} + +// ---------------------------------------------------------------------------- +// web server clients + +static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) { + (void)data; + + worker_private->connected++; + + size_t concurrent = worker_private->connected - worker_private->disconnected; + if(unlikely(concurrent > worker_private->max_concurrent)) + worker_private->max_concurrent = concurrent; + + *events = POLLIN; + + debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd); + struct web_client *w = web_client_create_on_fd(pi->fd, pi->client_ip, pi->client_port); + w->pollinfo_slot = pi->slot; + + if(unlikely(pi->socktype == AF_UNIX)) web_client_set_unix(w); else web_client_set_tcp(w); - return (void *)w; + debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd); + return w; } // TCP client disconnected -static void web_server_del_callback(int fd, int socktype, void *data) { - (void)fd; - (void)socktype; +static void web_server_del_callback(POLLINFO *pi) { + worker_private->disconnected++; - struct web_client *w = (struct web_client *)data; + struct web_client *w = (struct web_client *)pi->data; - if(w) { - if(w->ofd == -1 || fd == w->ofd) { - // we free the client, only if the closing fd - // is the client socket - web_client_free(w); - } + w->pollinfo_slot = 0; + if(unlikely(w->pollinfo_filecopy_slot)) { + POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket + debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd); } + else { + if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET)) + pi->flags |= POLLINFO_FLAG_DONT_CLOSE; - return; + debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd); + web_client_release(w); + } } -// Receive data -static int web_server_rcv_callback(int fd, int socktype, void *data, short int *events) { - (void)fd; - (void)socktype; - - *events = 0; - - struct web_client *w = (struct web_client *)data; +static int web_server_rcv_callback(POLLINFO *pi, short int *events) { + worker_private->receptions++; - if(unlikely(!web_client_has_wait_receive(w))) - return -1; + struct web_client *w = (struct web_client *)pi->data; + int fd = pi->fd; if(unlikely(web_client_receive(w) < 0)) return -1; + debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd); + web_client_process_request(w); + if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) { - if(unlikely(w->ifd != -1 && w->ifd != fd)) { - // FIXME: we switched input fd - // add a new socket to poll_events, with the same - } - else if(unlikely(w->ifd == -1)) { - // FIXME: we closed input fd - // instruct poll_events() to close fd - return -1; + if(w->pollinfo_filecopy_slot == 0) { + debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd); + + if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) { + // add a new socket to poll_events, with the same + debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd); + + POLLINFO *fpi = poll_add_fd( + pi->p + , w->ifd + , 0 + , POLLINFO_FLAG_CLIENT_SOCKET + , "FILENAME" + , "" + , web_server_file_add_callback + , web_werver_file_del_callback + , web_server_file_read_callback + , web_server_file_write_callback + , (void *) w + ); + + if(fpi) + w->pollinfo_filecopy_slot = fpi->slot; + else { + error("Failed to add filecopy fd. Closing client."); + return -1; + } + } } } else { - debug(D_WEB_CLIENT, "%llu: Processing received data.", w->id); - web_client_process_request(w); + if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) + *events |= POLLIN; } - if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) - *events |= POLLIN; - if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) *events |= POLLOUT; - if(unlikely(*events == 0)) - return -1; - - return 0; + return web_server_check_client_status(w); } -static int web_server_snd_callback(int fd, int socktype, void *data, short int *events) { - (void)fd; - (void)socktype; +static int web_server_snd_callback(POLLINFO *pi, short int *events) { + worker_private->sends++; - struct web_client *w = (struct web_client *)data; + struct web_client *w = (struct web_client *)pi->data; + int fd = pi->fd; - if(unlikely(!web_client_has_wait_send(w))) - return -1; + debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd); if(unlikely(web_client_send(w) < 0)) return -1; @@ -477,42 +1117,176 @@ static int web_server_snd_callback(int fd, int socktype, void *data, short int * if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) *events |= POLLOUT; - if(unlikely(*events == 0)) - return -1; - - return 0; + return web_server_check_client_status(w); } -void *socket_listen_main_single_threaded(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; +static void web_server_tmr_callback(void *timer_data) { + worker_private = (struct web_server_static_threaded_worker *)timer_data; - web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED; + static __thread RRDSET *st = NULL; + static __thread RRDDIM *rd_user = NULL, *rd_system = NULL; - info("Single-threaded WEB SERVER thread created with task id %d", gettid()); + if(unlikely(!st)) { + char id[100 + 1]; + char title[100 + 1]; - if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("Cannot set pthread cancel type to DEFERRED."); + snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1); + snprintfz(title, 100, "NetData web server thread No %d CPU usage", worker_private->id + 1); - if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("Cannot set pthread cancel state to ENABLE."); + st = rrdset_create_localhost( + "netdata" + , id + , NULL + , "web" + , "netdata.web_cpu" + , title + , "milliseconds/s" + , "web" + , "stats" + , 132000 + worker_private->id + , default_rrd_update_every + , RRDSET_TYPE_STACKED + ); + + rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + } + else + rrdset_next(st); - if(!api_sockets.opened) - fatal("LISTENER: no listen sockets available."); + struct rusage rusage; + getrusage(RUSAGE_THREAD, &rusage); + rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec); + rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec); + rrdset_done(st); +} - poll_events(&api_sockets - , web_server_add_callback - , web_server_del_callback - , web_server_rcv_callback - , web_server_snd_callback - , web_allow_connections_from - , NULL +// ---------------------------------------------------------------------------- +// web server worker thread + +static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) { + worker_private = (struct web_server_static_threaded_worker *)ptr; + + info("freeing local web clients cache..."); + web_client_cache_destroy(); + + info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends", + worker_private->connected, + worker_private->disconnected, + worker_private->max_concurrent, + worker_private->receptions, + worker_private->sends ); - debug(D_WEB_CLIENT, "LISTENER: exit!"); + worker_private->running = 0; +} + +void *socket_listen_main_static_threaded_worker(void *ptr) { + worker_private = (struct web_server_static_threaded_worker *)ptr; + worker_private->running = 1; + + netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr); + + poll_events(&api_sockets + , web_server_add_callback + , web_server_del_callback + , web_server_rcv_callback + , web_server_snd_callback + , web_server_tmr_callback + , web_allow_connections_from + , NULL + , web_client_first_request_timeout + , web_client_timeout + , default_rrd_update_every * 1000 // timer_milliseconds + , ptr // timer_data + , worker_private->max_sockets + ); + + netdata_thread_cleanup_pop(1); + return NULL; +} + + +// ---------------------------------------------------------------------------- +// web server main thread - also becomes a worker + +static void socket_listen_main_static_threaded_cleanup(void *ptr) { + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + int i, found = 0, max = 2 * USEC_PER_SEC, step = 50000; + + // we start from 1, - 0 is self + for(i = 1; i < static_threaded_workers_count; i++) { + if(static_workers_private_data[i].running) { + found++; + info("stopping worker %d", i + 1); + netdata_thread_cancel(static_workers_private_data[i].thread); + } + else + info("found stopped worker %d", i + 1); + } + + while(found && max > 0) { + max -= step; + info("Waiting %d static web threads to finish...", found); + sleep_usec(step); + found = 0; + + // we start from 1, - 0 is self + for(i = 1; i < static_threaded_workers_count; i++) { + if (static_workers_private_data[i].running) + found++; + } + } + + if(found) + error("%d static web threads are taking too long to finish. Giving up.", found); + + info("closing all web server sockets..."); listen_sockets_close(&api_sockets); - static_thread->enabled = 0; - pthread_exit(NULL); + info("all static web threads stopped."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +void *socket_listen_main_static_threaded(void *ptr) { + netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr); + web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; + + if(!api_sockets.opened) + fatal("LISTENER: no listen sockets available."); + + // 6 threads is the optimal value + // since 6 are the parallel connections browsers will do + // so, if the machine has more CPUs, avoid using resources unnecessarily + int def_thread_count = (processors > 6)?6:processors; + + static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count); + if(static_threaded_workers_count < 1) static_threaded_workers_count = 1; + + size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2)); + + static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker)); + + web_server_is_multithreaded = (static_threaded_workers_count > 1); + + int i; + for(i = 1; i < static_threaded_workers_count; i++) { + static_workers_private_data[i].id = i; + static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count; + + char tag[50 + 1]; + snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1); + + info("starting worker %d", i+1); + netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]); + } + + // and the main one + static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count; + socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]); + + netdata_thread_cleanup_pop(1); return NULL; } -#endif -- cgit v1.2.3