diff options
Diffstat (limited to 'src/web_server.c')
-rw-r--r-- | src/web_server.c | 1292 |
1 files changed, 0 insertions, 1292 deletions
diff --git a/src/web_server.c b/src/web_server.c deleted file mode 100644 index 31c546411..000000000 --- a/src/web_server.c +++ /dev/null @@ -1,1292 +0,0 @@ -#include "common.h" - -// this file includes 3 web servers: -// -// 1. single-threaded, based on select() -// 2. multi-threaded, based on poll() that spawns threads to handle the requests, based on select() -// 3. static-threaded, based on poll() using a fixed number of threads (configured at netdata.conf) - -WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; - -// -------------------------------------------------------------------------------------- - -WEB_SERVER_MODE web_server_mode_id(const char *mode) { - if(!strcmp(mode, "none")) - return WEB_SERVER_MODE_NONE; - else if(!strcmp(mode, "single") || !strcmp(mode, "single-threaded")) - return WEB_SERVER_MODE_SINGLE_THREADED; - else if(!strcmp(mode, "static") || !strcmp(mode, "static-threaded")) - return WEB_SERVER_MODE_STATIC_THREADED; - else // if(!strcmp(mode, "multi") || !strcmp(mode, "multi-threaded")) - return WEB_SERVER_MODE_MULTI_THREADED; -} - -const char *web_server_mode_name(WEB_SERVER_MODE id) { - switch(id) { - case WEB_SERVER_MODE_NONE: - return "none"; - - case WEB_SERVER_MODE_SINGLE_THREADED: - return "single-threaded"; - - case WEB_SERVER_MODE_STATIC_THREADED: - return "static-threaded"; - - default: - case WEB_SERVER_MODE_MULTI_THREADED: - return "multi-threaded"; - } -} - -// -------------------------------------------------------------------------------------- -// API sockets - -static LISTEN_SOCKETS api_sockets = { - .config_section = CONFIG_SECTION_WEB, - .default_bind_to = "*", - .default_port = API_LISTEN_PORT, - .backlog = API_LISTEN_BACKLOG -}; - -int api_listen_sockets_setup(void) { - int socks = listen_sockets_setup(&api_sockets); - - if(!socks) - fatal("LISTENER: Cannot listen on any API socket. Exiting..."); - - return socks; -} - - -// -------------------------------------------------------------------------------------- -// access lists - -SIMPLE_PATTERN *web_allow_connections_from = NULL; -SIMPLE_PATTERN *web_allow_streaming_from = NULL; -SIMPLE_PATTERN *web_allow_netdataconf_from = NULL; - -// WEB_CLIENT_ACL -SIMPLE_PATTERN *web_allow_dashboard_from = NULL; -SIMPLE_PATTERN *web_allow_registry_from = NULL; -SIMPLE_PATTERN *web_allow_badges_from = NULL; - -static void web_client_update_acl_matches(struct web_client *w) { - w->acl = WEB_CLIENT_ACL_NONE; - - if(!web_allow_dashboard_from || simple_pattern_matches(web_allow_dashboard_from, w->client_ip)) - w->acl |= WEB_CLIENT_ACL_DASHBOARD; - - if(!web_allow_registry_from || simple_pattern_matches(web_allow_registry_from, w->client_ip)) - w->acl |= WEB_CLIENT_ACL_REGISTRY; - - if(!web_allow_badges_from || simple_pattern_matches(web_allow_badges_from, w->client_ip)) - w->acl |= WEB_CLIENT_ACL_BADGE; -} - - -// -------------------------------------------------------------------------------------- - -static void log_connection(struct web_client *w, const char *msg) { - log_access("%llu: %d '[%s]:%s' '%s'", w->id, gettid(), w->client_ip, w->client_port, msg); -} - -// ---------------------------------------------------------------------------- -// allocate and free web_clients - -static void web_client_zero(struct web_client *w) { - // zero everything about it - but keep the buffers - - // remember the pointers to the buffers - BUFFER *b1 = w->response.data; - BUFFER *b2 = w->response.header; - BUFFER *b3 = w->response.header_output; - - // empty the buffers - buffer_flush(b1); - buffer_flush(b2); - buffer_flush(b3); - - freez(w->user_agent); - - // zero everything - memset(w, 0, sizeof(struct web_client)); - - // restore the pointers of the buffers - w->response.data = b1; - w->response.header = b2; - w->response.header_output = b3; -} - -static void web_client_free(struct web_client *w) { - buffer_free(w->response.header_output); - buffer_free(w->response.header); - buffer_free(w->response.data); - freez(w->user_agent); - freez(w); -} - -static struct web_client *web_client_alloc(void) { - struct web_client *w = callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - return w; -} - -// ---------------------------------------------------------------------------- -// web clients caching - -// When clients connect and disconnect, avoid allocating and releasing memory. -// Instead, when new clients get connected, reuse any memory previously allocated -// for serving web clients that are now disconnected. - -// The size of the cache is adaptive. It caches the structures of 2x -// the number of currently connected clients. - -// Comments per server: -// SINGLE-THREADED : 1 cache is maintained -// MULTI-THREADED : 1 cache is maintained -// STATIC-THREADED : 1 cache for each thred of the web server - -struct clients_cache { - pid_t pid; - - struct web_client *used; // the structures of the currently connected clients - size_t used_count; // the count the currently connected clients - - struct web_client *avail; // the cached structures, available for future clients - size_t avail_count; // the number of cached structures - - size_t reused; // the number of re-uses - size_t allocated; // the number of allocations -}; - -static __thread struct clients_cache web_clients_cache = { - .pid = 0, - .used = NULL, - .used_count = 0, - .avail = NULL, - .avail_count = 0, - .allocated = 0, - .reused = 0 -}; - -static inline void web_client_cache_verify(int force) { -#ifdef NETDATA_INTERNAL_CHECKS - static __thread size_t count = 0; - count++; - - if(unlikely(force || count > 1000)) { - count = 0; - - struct web_client *w; - size_t used = 0, avail = 0; - for(w = web_clients_cache.used; w ; w = w->next) used++; - for(w = web_clients_cache.avail; w ; w = w->next) avail++; - - info("web_client_cache has %zu (%zu) used and %zu (%zu) available clients, allocated %zu, reused %zu (hit %zu%%)." - , used, web_clients_cache.used_count - , avail, web_clients_cache.avail_count - , web_clients_cache.allocated - , web_clients_cache.reused - , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0 - ); - } -#else - if(unlikely(force)) { - info("web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)." - , web_clients_cache.used_count - , web_clients_cache.avail_count - , web_clients_cache.allocated - , web_clients_cache.reused - , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0 - ); - } -#endif -} - -// destroy the cache and free all the memory it uses -static void web_client_cache_destroy(void) { -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) - error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); - - web_client_cache_verify(1); -#endif - - netdata_thread_disable_cancelability(); - - struct web_client *w, *t; - - w = web_clients_cache.used; - while(w) { - t = w; - w = w->next; - web_client_free(t); - } - web_clients_cache.used = NULL; - web_clients_cache.used_count = 0; - - w = web_clients_cache.avail; - while(w) { - t = w; - w = w->next; - web_client_free(t); - } - web_clients_cache.avail = NULL; - web_clients_cache.avail_count = 0; - - netdata_thread_enable_cancelability(); -} - -static struct web_client *web_client_get_from_cache_or_allocate() { - -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(web_clients_cache.pid == 0)) - web_clients_cache.pid = gettid(); - - if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) - error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); -#endif - - netdata_thread_disable_cancelability(); - - struct web_client *w = web_clients_cache.avail; - - if(w) { - // get it from avail - if (w == web_clients_cache.avail) web_clients_cache.avail = w->next; - if(w->prev) w->prev->next = w->next; - if(w->next) w->next->prev = w->prev; - web_clients_cache.avail_count--; - web_client_zero(w); - web_clients_cache.reused++; - } - else { - // allocate it - w = web_client_alloc(); - web_clients_cache.allocated++; - } - - // link it to used web clients - if (web_clients_cache.used) web_clients_cache.used->prev = w; - w->next = web_clients_cache.used; - w->prev = NULL; - web_clients_cache.used = w; - web_clients_cache.used_count++; - - // initialize it - w->id = web_client_connected(); - w->mode = WEB_CLIENT_MODE_NORMAL; - - netdata_thread_enable_cancelability(); - - return w; -} - -static void web_client_release(struct web_client *w) { -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid())) - error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid()); - - if(unlikely(w->running)) - error("%llu: releasing web client from %s port %s, but it still running.", w->id, w->client_ip, w->client_port); -#endif - - debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port); - - log_connection(w, "DISCONNECTED"); - web_client_request_done(w); - web_client_disconnected(); - - netdata_thread_disable_cancelability(); - - if(web_server_mode != WEB_SERVER_MODE_STATIC_THREADED) { - if (w->ifd != -1) close(w->ifd); - if (w->ofd != -1 && w->ofd != w->ifd) close(w->ofd); - w->ifd = w->ofd = -1; - } - - // unlink it from the used - if (w == web_clients_cache.used) web_clients_cache.used = w->next; - if(w->prev) w->prev->next = w->next; - if(w->next) w->next->prev = w->prev; - web_clients_cache.used_count--; - - if(web_clients_cache.avail_count >= 2 * web_clients_cache.used_count) { - // we have too many of them - free it - web_client_free(w); - } - else { - // link it to the avail - if (web_clients_cache.avail) web_clients_cache.avail->prev = w; - w->next = web_clients_cache.avail; - w->prev = NULL; - web_clients_cache.avail = w; - web_clients_cache.avail_count++; - } - - netdata_thread_enable_cancelability(); -} - - -// ---------------------------------------------------------------------------- -// high level web clients connection management - -static void web_client_initialize_connection(struct web_client *w) { - int flag = 1; - if(setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0) - error("%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd); - - flag = 1; - if(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0) - error("%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd); - - web_client_update_acl_matches(w); - - w->origin[0] = '*'; w->origin[1] = '\0'; - w->cookie1[0] = '\0'; w->cookie2[0] = '\0'; - freez(w->user_agent); w->user_agent = NULL; - - web_client_enable_wait_receive(w); - - log_connection(w, "CONNECTED"); - - web_client_cache_verify(0); -} - -static struct web_client *web_client_create_on_fd(int fd, const char *client_ip, const char *client_port) { - struct web_client *w; - - w = web_client_get_from_cache_or_allocate(); - w->ifd = w->ofd = fd; - - strncpyz(w->client_ip, client_ip, sizeof(w->client_ip) - 1); - strncpyz(w->client_port, client_port, sizeof(w->client_port) - 1); - - if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-"); - if(unlikely(!*w->client_port)) strcpy(w->client_port, "-"); - - web_client_initialize_connection(w); - return(w); -} - -static struct web_client *web_client_create_on_listenfd(int listener) { - struct web_client *w; - - w = web_client_get_from_cache_or_allocate(); - w->ifd = w->ofd = accept_socket(listener, SOCK_NONBLOCK, w->client_ip, sizeof(w->client_ip), w->client_port, sizeof(w->client_port), web_allow_connections_from); - - if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-"); - if(unlikely(!*w->client_port)) strcpy(w->client_port, "-"); - - if (w->ifd == -1) { - if(errno == EPERM) - log_connection(w, "ACCESS DENIED"); - else { - log_connection(w, "CONNECTION FAILED"); - error("%llu: Failed to accept new incoming connection.", w->id); - } - - web_client_release(w); - return NULL; - } - - web_client_initialize_connection(w); - return(w); -} - - -// -------------------------------------------------------------------------------------- -// the thread of a single client - for the MULTI-THREADED web server - -// 1. waits for input and output, using async I/O -// 2. it processes HTTP requests -// 3. it generates HTTP responses -// 4. it copies data from input to output if mode is FILECOPY - -int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS; -int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST; - -static void multi_threaded_web_client_worker_main_cleanup(void *ptr) { - struct web_client *w = ptr; - WEB_CLIENT_IS_DEAD(w); - w->running = 0; -} - -static void *multi_threaded_web_client_worker_main(void *ptr) { - netdata_thread_cleanup_push(multi_threaded_web_client_worker_main_cleanup, ptr); - - struct web_client *w = ptr; - w->running = 1; - - struct pollfd fds[2], *ifd, *ofd; - int retval, timeout_ms; - nfds_t fdmax = 0; - - while(!netdata_exit) { - if(unlikely(web_client_check_dead(w))) { - debug(D_WEB_CLIENT, "%llu: client is dead.", w->id); - break; - } - else if(unlikely(!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))) { - debug(D_WEB_CLIENT, "%llu: client is not set for neither receiving nor sending data.", w->id); - break; - } - - if(unlikely(w->ifd < 0 || w->ofd < 0)) { - error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd", w->id, w->ifd, w->ofd); - break; - } - - if(w->ifd == w->ofd) { - fds[0].fd = w->ifd; - fds[0].events = 0; - fds[0].revents = 0; - - if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN; - if(web_client_has_wait_send(w)) fds[0].events |= POLLOUT; - - fds[1].fd = -1; - fds[1].events = 0; - fds[1].revents = 0; - - ifd = ofd = &fds[0]; - - fdmax = 1; - } - else { - fds[0].fd = w->ifd; - fds[0].events = 0; - fds[0].revents = 0; - if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN; - ifd = &fds[0]; - - fds[1].fd = w->ofd; - fds[1].events = 0; - fds[1].revents = 0; - if(web_client_has_wait_send(w)) fds[1].events |= POLLOUT; - ofd = &fds[1]; - - fdmax = 2; - } - - debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":""); - errno = 0; - timeout_ms = web_client_timeout * 1000; - retval = poll(fds, fdmax, timeout_ms); - - if(unlikely(netdata_exit)) break; - - if(unlikely(retval == -1)) { - if(errno == EAGAIN || errno == EINTR) { - debug(D_WEB_CLIENT, "%llu: EAGAIN received.", w->id); - continue; - } - - debug(D_WEB_CLIENT, "%llu: LISTENER: poll() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd); - break; - } - else if(unlikely(!retval)) { - debug(D_WEB_CLIENT, "%llu: Timeout while waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":""); - break; - } - - if(unlikely(netdata_exit)) break; - - int used = 0; - if(web_client_has_wait_send(w) && ofd->revents & POLLOUT) { - used++; - if(web_client_send(w) < 0) { - debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id); - break; - } - } - - if(unlikely(netdata_exit)) break; - - if(web_client_has_wait_receive(w) && (ifd->revents & POLLIN || ifd->revents & POLLPRI)) { - used++; - if(web_client_receive(w) < 0) { - debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id); - break; - } - - if(w->mode == WEB_CLIENT_MODE_NORMAL) { - debug(D_WEB_CLIENT, "%llu: Attempting to process received data.", w->id); - web_client_process_request(w); - - // if the sockets are closed, may have transferred this client - // to plugins.d - if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM)) - break; - } - } - - if(unlikely(!used)) { - debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on socket.", w->id); - break; - } - } - - if(w->mode != WEB_CLIENT_MODE_STREAM) - log_connection(w, "DISCONNECTED"); - - web_client_request_done(w); - - debug(D_WEB_CLIENT, "%llu: done...", w->id); - - // close the sockets/files now - // to free file descriptors - if(w->ifd == w->ofd) { - if(w->ifd != -1) close(w->ifd); - } - else { - if(w->ifd != -1) close(w->ifd); - if(w->ofd != -1) close(w->ofd); - } - w->ifd = -1; - w->ofd = -1; - - netdata_thread_cleanup_pop(1); - return NULL; -} - -// -------------------------------------------------------------------------------------- -// the main socket listener - MULTI-THREADED - -// 1. it accepts new incoming requests on our port -// 2. creates a new web_client for each connection received -// 3. spawns a new netdata_thread to serve the client (this is optimal for keep-alive clients) -// 4. cleans up old web_clients that their netdata_threads have been exited - -static void web_client_multi_threaded_web_server_release_clients(void) { - struct web_client *w; - for(w = web_clients_cache.used; w ; ) { - if(unlikely(!w->running && web_client_check_dead(w))) { - struct web_client *t = w->next; - web_client_release(w); - w = t; - } - else - w = w->next; - } -} - -static void web_client_multi_threaded_web_server_stop_all_threads(void) { - struct web_client *w; - - int found = 1, max = 2 * USEC_PER_SEC, step = 50000; - for(w = web_clients_cache.used; w ; w = w->next) { - if(w->running) { - found++; - info("stopping web client %s, id %llu", w->client_ip, w->id); - netdata_thread_cancel(w->thread); - } - } - - while(found && max > 0) { - max -= step; - info("Waiting %d web threads to finish...", found); - sleep_usec(step); - found = 0; - for(w = web_clients_cache.used; w ; w = w->next) - if(w->running) found++; - } - - if(found) - error("%d web threads are taking too long to finish. Giving up.", found); -} - -static struct pollfd *socket_listen_main_multi_threaded_fds = NULL; - -static void socket_listen_main_multi_threaded_cleanup(void *data) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - info("cleaning up..."); - - info("releasing allocated memory..."); - freez(socket_listen_main_multi_threaded_fds); - - info("closing all sockets..."); - listen_sockets_close(&api_sockets); - - info("stopping all running web server threads..."); - web_client_multi_threaded_web_server_stop_all_threads(); - - info("freeing web clients cache..."); - web_client_cache_destroy(); - - info("cleanup completed."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; -} - -#define CLEANUP_EVERY_EVENTS 60 -void *socket_listen_main_multi_threaded(void *ptr) { - netdata_thread_cleanup_push(socket_listen_main_multi_threaded_cleanup, ptr); - - web_server_mode = WEB_SERVER_MODE_MULTI_THREADED; - web_server_is_multithreaded = 1; - - struct web_client *w; - int retval, counter = 0; - - if(!api_sockets.opened) - fatal("LISTENER: No sockets to listen to."); - - socket_listen_main_multi_threaded_fds = callocz(sizeof(struct pollfd), api_sockets.opened); - - size_t i; - for(i = 0; i < api_sockets.opened ;i++) { - socket_listen_main_multi_threaded_fds[i].fd = api_sockets.fds[i]; - socket_listen_main_multi_threaded_fds[i].events = POLLIN; - socket_listen_main_multi_threaded_fds[i].revents = 0; - - info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN"); - } - - int timeout_ms = 1 * 1000; - - while(!netdata_exit) { - - // debug(D_WEB_CLIENT, "LISTENER: Waiting..."); - retval = poll(socket_listen_main_multi_threaded_fds, api_sockets.opened, timeout_ms); - - if(unlikely(retval == -1)) { - error("LISTENER: poll() failed."); - continue; - } - else if(unlikely(!retval)) { - debug(D_WEB_CLIENT, "LISTENER: poll() timeout."); - counter++; - continue; - } - - for(i = 0 ; i < api_sockets.opened ; i++) { - short int revents = socket_listen_main_multi_threaded_fds[i].revents; - - // check for new incoming connections - if(revents & POLLIN || revents & POLLPRI) { - socket_listen_main_multi_threaded_fds[i].revents = 0; - - w = web_client_create_on_listenfd(socket_listen_main_multi_threaded_fds[i].fd); - if(unlikely(!w)) { - // no need for error log - web_client_create_on_listenfd already logged the error - continue; - } - - if(api_sockets.fds_families[i] == AF_UNIX) - web_client_set_unix(w); - else - web_client_set_tcp(w); - - char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "WEB_CLIENT[%llu,[%s]:%s]", w->id, w->client_ip, w->client_port); - - w->running = 1; - if(netdata_thread_create(&w->thread, tag, NETDATA_THREAD_OPTION_DONT_LOG, multi_threaded_web_client_worker_main, w) != 0) { - w->running = 0; - web_client_release(w); - } - } - } - - counter++; - if(counter > CLEANUP_EVERY_EVENTS) { - counter = 0; - web_client_multi_threaded_web_server_release_clients(); - } - } - - netdata_thread_cleanup_pop(1); - return NULL; -} - - -// -------------------------------------------------------------------------------------- -// the main socket listener - SINGLE-THREADED - -struct web_client *single_threaded_clients[FD_SETSIZE]; - -static inline int single_threaded_link_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds, int *max) { - if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) { - return 1; - } - - if(unlikely(w->ifd < 0 || w->ifd >= (int)FD_SETSIZE || w->ofd < 0 || w->ofd >= (int)FD_SETSIZE)) { - error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, (int)FD_SETSIZE); - return 1; - } - - FD_SET(w->ifd, efds); - if(unlikely(*max < w->ifd)) *max = w->ifd; - - if(unlikely(w->ifd != w->ofd)) { - if(*max < w->ofd) *max = w->ofd; - FD_SET(w->ofd, efds); - } - - if(web_client_has_wait_receive(w)) FD_SET(w->ifd, ifds); - if(web_client_has_wait_send(w)) FD_SET(w->ofd, ofds); - - single_threaded_clients[w->ifd] = w; - single_threaded_clients[w->ofd] = w; - - return 0; -} - -static inline int single_threaded_unlink_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds) { - FD_CLR(w->ifd, efds); - if(unlikely(w->ifd != w->ofd)) FD_CLR(w->ofd, efds); - - if(web_client_has_wait_receive(w)) FD_CLR(w->ifd, ifds); - if(web_client_has_wait_send(w)) FD_CLR(w->ofd, ofds); - - single_threaded_clients[w->ifd] = NULL; - single_threaded_clients[w->ofd] = NULL; - - if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) { - return 1; - } - - return 0; -} - -static void socket_listen_main_single_threaded_cleanup(void *data) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - info("closing all sockets..."); - listen_sockets_close(&api_sockets); - - info("freeing web clients cache..."); - web_client_cache_destroy(); - - info("cleanup completed."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; -} - -void *socket_listen_main_single_threaded(void *ptr) { - netdata_thread_cleanup_push(socket_listen_main_single_threaded_cleanup, ptr); - web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED; - web_server_is_multithreaded = 0; - - struct web_client *w; - - if(!api_sockets.opened) - fatal("LISTENER: no listen sockets available."); - - size_t i; - for(i = 0; i < (size_t)FD_SETSIZE ; i++) - single_threaded_clients[i] = NULL; - - fd_set ifds, ofds, efds, rifds, rofds, refds; - FD_ZERO (&ifds); - FD_ZERO (&ofds); - FD_ZERO (&efds); - int fdmax = 0; - - for(i = 0; i < api_sockets.opened ; i++) { - if (api_sockets.fds[i] < 0 || api_sockets.fds[i] >= (int)FD_SETSIZE) - fatal("LISTENER: Listen socket %d is not ready, or invalid.", api_sockets.fds[i]); - - info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN"); - - FD_SET(api_sockets.fds[i], &ifds); - FD_SET(api_sockets.fds[i], &efds); - if(fdmax < api_sockets.fds[i]) - fdmax = api_sockets.fds[i]; - } - - while(!netdata_exit) { - debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server waiting (fdmax = %d)...", fdmax); - - struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; - rifds = ifds; - rofds = ofds; - refds = efds; - int retval = select(fdmax+1, &rifds, &rofds, &refds, &tv); - - if(unlikely(retval == -1)) { - error("LISTENER: select() failed."); - continue; - } - else if(likely(retval)) { - debug(D_WEB_CLIENT_ACCESS, "LISTENER: got something."); - - for(i = 0; i < api_sockets.opened ; i++) { - if (FD_ISSET(api_sockets.fds[i], &rifds)) { - debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection."); - w = web_client_create_on_listenfd(api_sockets.fds[i]); - if(unlikely(!w)) - continue; - - if(api_sockets.fds_families[i] == AF_UNIX) - web_client_set_unix(w); - else - web_client_set_tcp(w); - - if (single_threaded_link_client(w, &ifds, &ofds, &ifds, &fdmax) != 0) { - web_client_release(w); - } - } - } - - for(i = 0 ; i <= (size_t)fdmax ; i++) { - if(likely(!FD_ISSET(i, &rifds) && !FD_ISSET(i, &rofds) && !FD_ISSET(i, &refds))) - continue; - - w = single_threaded_clients[i]; - if(unlikely(!w)) { - // error("no client on slot %zu", i); - continue; - } - - if(unlikely(single_threaded_unlink_client(w, &ifds, &ofds, &efds) != 0)) { - // error("failed to unlink client %zu", i); - web_client_release(w); - continue; - } - - if (unlikely(FD_ISSET(w->ifd, &refds) || FD_ISSET(w->ofd, &refds))) { - // error("no input on client %zu", i); - web_client_release(w); - continue; - } - - if (unlikely(web_client_has_wait_receive(w) && FD_ISSET(w->ifd, &rifds))) { - if (unlikely(web_client_receive(w) < 0)) { - // error("cannot read from client %zu", i); - web_client_release(w); - continue; - } - - if (w->mode != WEB_CLIENT_MODE_FILECOPY) { - debug(D_WEB_CLIENT, "%llu: Processing received data.", w->id); - web_client_process_request(w); - } - } - - if (unlikely(web_client_has_wait_send(w) && FD_ISSET(w->ofd, &rofds))) { - if (unlikely(web_client_send(w) < 0)) { - // error("cannot send data to client %zu", i); - debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id); - web_client_release(w); - continue; - } - } - - if(unlikely(single_threaded_link_client(w, &ifds, &ofds, &efds, &fdmax) != 0)) { - // error("failed to link client %zu", i); - web_client_release(w); - } - } - } - else { - debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server timeout."); - } - } - - netdata_thread_cleanup_pop(1); - return NULL; -} - - -// -------------------------------------------------------------------------------------- -// the main socket listener - STATIC-THREADED - -struct web_server_static_threaded_worker { - netdata_thread_t thread; - - int id; - int running; - - size_t max_sockets; - - volatile size_t connected; - volatile size_t disconnected; - volatile size_t receptions; - volatile size_t sends; - volatile size_t max_concurrent; - - volatile size_t files_read; - volatile size_t file_reads; -}; - -static long long static_threaded_workers_count = 1; -static struct web_server_static_threaded_worker *static_workers_private_data = NULL; -static __thread struct web_server_static_threaded_worker *worker_private = NULL; - -// ---------------------------------------------------------------------------- - -static inline int web_server_check_client_status(struct web_client *w) { - if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) - return -1; - - return 0; -} - -// ---------------------------------------------------------------------------- -// web server files - -static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) { - struct web_client *w = (struct web_client *)data; - - worker_private->files_read++; - - debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd); - *events = POLLIN; - pi->data = w; - return w; -} - -static void web_werver_file_del_callback(POLLINFO *pi) { - struct web_client *w = (struct web_client *)pi->data; - debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd); - - w->pollinfo_filecopy_slot = 0; - - if(unlikely(!w->pollinfo_slot)) { - debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd); - web_client_release(w); - } -} - -static int web_server_file_read_callback(POLLINFO *pi, short int *events) { - struct web_client *w = (struct web_client *)pi->data; - - // if there is no POLLINFO linked to this, it means the client disconnected - // stop the file reading too - if(unlikely(!w->pollinfo_slot)) { - debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd); - return -1; - } - - if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) { - debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd); - return -1; - } - - debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd); - - worker_private->file_reads++; - ssize_t ret = unlikely(web_client_read_file(w)); - - if(likely(web_client_has_wait_send(w))) { - POLLJOB *p = pi->p; // our POLLJOB - POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket - - debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd); - p->fds[wpi->slot].events |= POLLOUT; - } - - if(unlikely(ret <= 0 || w->ifd == w->ofd)) { - debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd); - return -1; - } - - *events = POLLIN; - return 0; -} - -static int web_server_file_write_callback(POLLINFO *pi, short int *events) { - (void)pi; - (void)events; - - error("Writing to web files is not supported!"); - - return -1; -} - -// ---------------------------------------------------------------------------- -// web server clients - -static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) { - (void)data; - - worker_private->connected++; - - size_t concurrent = worker_private->connected - worker_private->disconnected; - if(unlikely(concurrent > worker_private->max_concurrent)) - worker_private->max_concurrent = concurrent; - - *events = POLLIN; - - debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd); - struct web_client *w = web_client_create_on_fd(pi->fd, pi->client_ip, pi->client_port); - w->pollinfo_slot = pi->slot; - - if(unlikely(pi->socktype == AF_UNIX)) - web_client_set_unix(w); - else - web_client_set_tcp(w); - - debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd); - return w; -} - -// TCP client disconnected -static void web_server_del_callback(POLLINFO *pi) { - worker_private->disconnected++; - - struct web_client *w = (struct web_client *)pi->data; - - w->pollinfo_slot = 0; - if(unlikely(w->pollinfo_filecopy_slot)) { - POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket - debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd); - } - else { - if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET)) - pi->flags |= POLLINFO_FLAG_DONT_CLOSE; - - debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd); - web_client_release(w); - } -} - -static int web_server_rcv_callback(POLLINFO *pi, short int *events) { - worker_private->receptions++; - - struct web_client *w = (struct web_client *)pi->data; - int fd = pi->fd; - - if(unlikely(web_client_receive(w) < 0)) - return -1; - - debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd); - web_client_process_request(w); - - if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) { - if(w->pollinfo_filecopy_slot == 0) { - debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd); - - if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) { - // add a new socket to poll_events, with the same - debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd); - - POLLINFO *fpi = poll_add_fd( - pi->p - , w->ifd - , 0 - , POLLINFO_FLAG_CLIENT_SOCKET - , "FILENAME" - , "" - , web_server_file_add_callback - , web_werver_file_del_callback - , web_server_file_read_callback - , web_server_file_write_callback - , (void *) w - ); - - if(fpi) - w->pollinfo_filecopy_slot = fpi->slot; - else { - error("Failed to add filecopy fd. Closing client."); - return -1; - } - } - } - } - else { - if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) - *events |= POLLIN; - } - - if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) - *events |= POLLOUT; - - return web_server_check_client_status(w); -} - -static int web_server_snd_callback(POLLINFO *pi, short int *events) { - worker_private->sends++; - - struct web_client *w = (struct web_client *)pi->data; - int fd = pi->fd; - - debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd); - - if(unlikely(web_client_send(w) < 0)) - return -1; - - if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) - *events |= POLLIN; - - if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) - *events |= POLLOUT; - - return web_server_check_client_status(w); -} - -static void web_server_tmr_callback(void *timer_data) { - worker_private = (struct web_server_static_threaded_worker *)timer_data; - - static __thread RRDSET *st = NULL; - static __thread RRDDIM *rd_user = NULL, *rd_system = NULL; - - if(unlikely(!st)) { - char id[100 + 1]; - char title[100 + 1]; - - snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1); - snprintfz(title, 100, "NetData web server thread No %d CPU usage", worker_private->id + 1); - - st = rrdset_create_localhost( - "netdata" - , id - , NULL - , "web" - , "netdata.web_cpu" - , title - , "milliseconds/s" - , "web" - , "stats" - , 132000 + worker_private->id - , default_rrd_update_every - , RRDSET_TYPE_STACKED - ); - - rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } - else - rrdset_next(st); - - struct rusage rusage; - getrusage(RUSAGE_THREAD, &rusage); - rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec); - rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec); - rrdset_done(st); -} - -// ---------------------------------------------------------------------------- -// web server worker thread - -static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) { - worker_private = (struct web_server_static_threaded_worker *)ptr; - - info("freeing local web clients cache..."); - web_client_cache_destroy(); - - info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends", - worker_private->connected, - worker_private->disconnected, - worker_private->max_concurrent, - worker_private->receptions, - worker_private->sends - ); - - worker_private->running = 0; -} - -void *socket_listen_main_static_threaded_worker(void *ptr) { - worker_private = (struct web_server_static_threaded_worker *)ptr; - worker_private->running = 1; - - netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr); - - poll_events(&api_sockets - , web_server_add_callback - , web_server_del_callback - , web_server_rcv_callback - , web_server_snd_callback - , web_server_tmr_callback - , web_allow_connections_from - , NULL - , web_client_first_request_timeout - , web_client_timeout - , default_rrd_update_every * 1000 // timer_milliseconds - , ptr // timer_data - , worker_private->max_sockets - ); - - netdata_thread_cleanup_pop(1); - return NULL; -} - - -// ---------------------------------------------------------------------------- -// web server main thread - also becomes a worker - -static void socket_listen_main_static_threaded_cleanup(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - int i, found = 0, max = 2 * USEC_PER_SEC, step = 50000; - - // we start from 1, - 0 is self - for(i = 1; i < static_threaded_workers_count; i++) { - if(static_workers_private_data[i].running) { - found++; - info("stopping worker %d", i + 1); - netdata_thread_cancel(static_workers_private_data[i].thread); - } - else - info("found stopped worker %d", i + 1); - } - - while(found && max > 0) { - max -= step; - info("Waiting %d static web threads to finish...", found); - sleep_usec(step); - found = 0; - - // we start from 1, - 0 is self - for(i = 1; i < static_threaded_workers_count; i++) { - if (static_workers_private_data[i].running) - found++; - } - } - - if(found) - error("%d static web threads are taking too long to finish. Giving up.", found); - - info("closing all web server sockets..."); - listen_sockets_close(&api_sockets); - - info("all static web threads stopped."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; -} - -void *socket_listen_main_static_threaded(void *ptr) { - netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr); - web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; - - if(!api_sockets.opened) - fatal("LISTENER: no listen sockets available."); - - // 6 threads is the optimal value - // since 6 are the parallel connections browsers will do - // so, if the machine has more CPUs, avoid using resources unnecessarily - int def_thread_count = (processors > 6)?6:processors; - - static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count); - if(static_threaded_workers_count < 1) static_threaded_workers_count = 1; - - size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2)); - - static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker)); - - web_server_is_multithreaded = (static_threaded_workers_count > 1); - - int i; - for(i = 1; i < static_threaded_workers_count; i++) { - static_workers_private_data[i].id = i; - static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count; - - char tag[50 + 1]; - snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1); - - info("starting worker %d", i+1); - netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]); - } - - // and the main one - static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count; - socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]); - - netdata_thread_cleanup_pop(1); - return NULL; -} |