summaryrefslogtreecommitdiffstats
path: root/src/web_server.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:19:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:20:17 +0000
commita64a253794ac64cb40befee54db53bde17dd0d49 (patch)
treec1024acc5f6e508814b944d99f112259bb28b1be /src/web_server.c
parentNew upstream version 1.10.0+dfsg (diff)
downloadnetdata-a64a253794ac64cb40befee54db53bde17dd0d49.tar.xz
netdata-a64a253794ac64cb40befee54db53bde17dd0d49.zip
New upstream version 1.11.0+dfsgupstream/1.11.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/web_server.c')
-rw-r--r--src/web_server.c1292
1 files changed, 0 insertions, 1292 deletions
diff --git a/src/web_server.c b/src/web_server.c
deleted file mode 100644
index 31c54641..00000000
--- a/src/web_server.c
+++ /dev/null
@@ -1,1292 +0,0 @@
-#include "common.h"
-
-// this file includes 3 web servers:
-//
-// 1. single-threaded, based on select()
-// 2. multi-threaded, based on poll() that spawns threads to handle the requests, based on select()
-// 3. static-threaded, based on poll() using a fixed number of threads (configured at netdata.conf)
-
-WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
-
-// --------------------------------------------------------------------------------------
-
-WEB_SERVER_MODE web_server_mode_id(const char *mode) {
- if(!strcmp(mode, "none"))
- return WEB_SERVER_MODE_NONE;
- else if(!strcmp(mode, "single") || !strcmp(mode, "single-threaded"))
- return WEB_SERVER_MODE_SINGLE_THREADED;
- else if(!strcmp(mode, "static") || !strcmp(mode, "static-threaded"))
- return WEB_SERVER_MODE_STATIC_THREADED;
- else // if(!strcmp(mode, "multi") || !strcmp(mode, "multi-threaded"))
- return WEB_SERVER_MODE_MULTI_THREADED;
-}
-
-const char *web_server_mode_name(WEB_SERVER_MODE id) {
- switch(id) {
- case WEB_SERVER_MODE_NONE:
- return "none";
-
- case WEB_SERVER_MODE_SINGLE_THREADED:
- return "single-threaded";
-
- case WEB_SERVER_MODE_STATIC_THREADED:
- return "static-threaded";
-
- default:
- case WEB_SERVER_MODE_MULTI_THREADED:
- return "multi-threaded";
- }
-}
-
-// --------------------------------------------------------------------------------------
-// API sockets
-
-static LISTEN_SOCKETS api_sockets = {
- .config_section = CONFIG_SECTION_WEB,
- .default_bind_to = "*",
- .default_port = API_LISTEN_PORT,
- .backlog = API_LISTEN_BACKLOG
-};
-
-int api_listen_sockets_setup(void) {
- int socks = listen_sockets_setup(&api_sockets);
-
- if(!socks)
- fatal("LISTENER: Cannot listen on any API socket. Exiting...");
-
- return socks;
-}
-
-
-// --------------------------------------------------------------------------------------
-// access lists
-
-SIMPLE_PATTERN *web_allow_connections_from = NULL;
-SIMPLE_PATTERN *web_allow_streaming_from = NULL;
-SIMPLE_PATTERN *web_allow_netdataconf_from = NULL;
-
-// WEB_CLIENT_ACL
-SIMPLE_PATTERN *web_allow_dashboard_from = NULL;
-SIMPLE_PATTERN *web_allow_registry_from = NULL;
-SIMPLE_PATTERN *web_allow_badges_from = NULL;
-
-static void web_client_update_acl_matches(struct web_client *w) {
- w->acl = WEB_CLIENT_ACL_NONE;
-
- if(!web_allow_dashboard_from || simple_pattern_matches(web_allow_dashboard_from, w->client_ip))
- w->acl |= WEB_CLIENT_ACL_DASHBOARD;
-
- if(!web_allow_registry_from || simple_pattern_matches(web_allow_registry_from, w->client_ip))
- w->acl |= WEB_CLIENT_ACL_REGISTRY;
-
- if(!web_allow_badges_from || simple_pattern_matches(web_allow_badges_from, w->client_ip))
- w->acl |= WEB_CLIENT_ACL_BADGE;
-}
-
-
-// --------------------------------------------------------------------------------------
-
-static void log_connection(struct web_client *w, const char *msg) {
- log_access("%llu: %d '[%s]:%s' '%s'", w->id, gettid(), w->client_ip, w->client_port, msg);
-}
-
-// ----------------------------------------------------------------------------
-// allocate and free web_clients
-
-static void web_client_zero(struct web_client *w) {
- // zero everything about it - but keep the buffers
-
- // remember the pointers to the buffers
- BUFFER *b1 = w->response.data;
- BUFFER *b2 = w->response.header;
- BUFFER *b3 = w->response.header_output;
-
- // empty the buffers
- buffer_flush(b1);
- buffer_flush(b2);
- buffer_flush(b3);
-
- freez(w->user_agent);
-
- // zero everything
- memset(w, 0, sizeof(struct web_client));
-
- // restore the pointers of the buffers
- w->response.data = b1;
- w->response.header = b2;
- w->response.header_output = b3;
-}
-
-static void web_client_free(struct web_client *w) {
- buffer_free(w->response.header_output);
- buffer_free(w->response.header);
- buffer_free(w->response.data);
- freez(w->user_agent);
- freez(w);
-}
-
-static struct web_client *web_client_alloc(void) {
- struct web_client *w = callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- return w;
-}
-
-// ----------------------------------------------------------------------------
-// web clients caching
-
-// When clients connect and disconnect, avoid allocating and releasing memory.
-// Instead, when new clients get connected, reuse any memory previously allocated
-// for serving web clients that are now disconnected.
-
-// The size of the cache is adaptive. It caches the structures of 2x
-// the number of currently connected clients.
-
-// Comments per server:
-// SINGLE-THREADED : 1 cache is maintained
-// MULTI-THREADED : 1 cache is maintained
-// STATIC-THREADED : 1 cache for each thred of the web server
-
-struct clients_cache {
- pid_t pid;
-
- struct web_client *used; // the structures of the currently connected clients
- size_t used_count; // the count the currently connected clients
-
- struct web_client *avail; // the cached structures, available for future clients
- size_t avail_count; // the number of cached structures
-
- size_t reused; // the number of re-uses
- size_t allocated; // the number of allocations
-};
-
-static __thread struct clients_cache web_clients_cache = {
- .pid = 0,
- .used = NULL,
- .used_count = 0,
- .avail = NULL,
- .avail_count = 0,
- .allocated = 0,
- .reused = 0
-};
-
-static inline void web_client_cache_verify(int force) {
-#ifdef NETDATA_INTERNAL_CHECKS
- static __thread size_t count = 0;
- count++;
-
- if(unlikely(force || count > 1000)) {
- count = 0;
-
- struct web_client *w;
- size_t used = 0, avail = 0;
- for(w = web_clients_cache.used; w ; w = w->next) used++;
- for(w = web_clients_cache.avail; w ; w = w->next) avail++;
-
- info("web_client_cache has %zu (%zu) used and %zu (%zu) available clients, allocated %zu, reused %zu (hit %zu%%)."
- , used, web_clients_cache.used_count
- , avail, web_clients_cache.avail_count
- , web_clients_cache.allocated
- , web_clients_cache.reused
- , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
- );
- }
-#else
- if(unlikely(force)) {
- info("web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)."
- , web_clients_cache.used_count
- , web_clients_cache.avail_count
- , web_clients_cache.allocated
- , web_clients_cache.reused
- , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
- );
- }
-#endif
-}
-
-// destroy the cache and free all the memory it uses
-static void web_client_cache_destroy(void) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
- error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
-
- web_client_cache_verify(1);
-#endif
-
- netdata_thread_disable_cancelability();
-
- struct web_client *w, *t;
-
- w = web_clients_cache.used;
- while(w) {
- t = w;
- w = w->next;
- web_client_free(t);
- }
- web_clients_cache.used = NULL;
- web_clients_cache.used_count = 0;
-
- w = web_clients_cache.avail;
- while(w) {
- t = w;
- w = w->next;
- web_client_free(t);
- }
- web_clients_cache.avail = NULL;
- web_clients_cache.avail_count = 0;
-
- netdata_thread_enable_cancelability();
-}
-
-static struct web_client *web_client_get_from_cache_or_allocate() {
-
-#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(web_clients_cache.pid == 0))
- web_clients_cache.pid = gettid();
-
- if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
- error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
-#endif
-
- netdata_thread_disable_cancelability();
-
- struct web_client *w = web_clients_cache.avail;
-
- if(w) {
- // get it from avail
- if (w == web_clients_cache.avail) web_clients_cache.avail = w->next;
- if(w->prev) w->prev->next = w->next;
- if(w->next) w->next->prev = w->prev;
- web_clients_cache.avail_count--;
- web_client_zero(w);
- web_clients_cache.reused++;
- }
- else {
- // allocate it
- w = web_client_alloc();
- web_clients_cache.allocated++;
- }
-
- // link it to used web clients
- if (web_clients_cache.used) web_clients_cache.used->prev = w;
- w->next = web_clients_cache.used;
- w->prev = NULL;
- web_clients_cache.used = w;
- web_clients_cache.used_count++;
-
- // initialize it
- w->id = web_client_connected();
- w->mode = WEB_CLIENT_MODE_NORMAL;
-
- netdata_thread_enable_cancelability();
-
- return w;
-}
-
-static void web_client_release(struct web_client *w) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
- error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
-
- if(unlikely(w->running))
- error("%llu: releasing web client from %s port %s, but it still running.", w->id, w->client_ip, w->client_port);
-#endif
-
- debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port);
-
- log_connection(w, "DISCONNECTED");
- web_client_request_done(w);
- web_client_disconnected();
-
- netdata_thread_disable_cancelability();
-
- if(web_server_mode != WEB_SERVER_MODE_STATIC_THREADED) {
- if (w->ifd != -1) close(w->ifd);
- if (w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
- w->ifd = w->ofd = -1;
- }
-
- // unlink it from the used
- if (w == web_clients_cache.used) web_clients_cache.used = w->next;
- if(w->prev) w->prev->next = w->next;
- if(w->next) w->next->prev = w->prev;
- web_clients_cache.used_count--;
-
- if(web_clients_cache.avail_count >= 2 * web_clients_cache.used_count) {
- // we have too many of them - free it
- web_client_free(w);
- }
- else {
- // link it to the avail
- if (web_clients_cache.avail) web_clients_cache.avail->prev = w;
- w->next = web_clients_cache.avail;
- w->prev = NULL;
- web_clients_cache.avail = w;
- web_clients_cache.avail_count++;
- }
-
- netdata_thread_enable_cancelability();
-}
-
-
-// ----------------------------------------------------------------------------
-// high level web clients connection management
-
-static void web_client_initialize_connection(struct web_client *w) {
- int flag = 1;
- if(setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0)
- error("%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd);
-
- flag = 1;
- if(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0)
- error("%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd);
-
- web_client_update_acl_matches(w);
-
- w->origin[0] = '*'; w->origin[1] = '\0';
- w->cookie1[0] = '\0'; w->cookie2[0] = '\0';
- freez(w->user_agent); w->user_agent = NULL;
-
- web_client_enable_wait_receive(w);
-
- log_connection(w, "CONNECTED");
-
- web_client_cache_verify(0);
-}
-
-static struct web_client *web_client_create_on_fd(int fd, const char *client_ip, const char *client_port) {
- struct web_client *w;
-
- w = web_client_get_from_cache_or_allocate();
- w->ifd = w->ofd = fd;
-
- strncpyz(w->client_ip, client_ip, sizeof(w->client_ip) - 1);
- strncpyz(w->client_port, client_port, sizeof(w->client_port) - 1);
-
- if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
- if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
-
- web_client_initialize_connection(w);
- return(w);
-}
-
-static struct web_client *web_client_create_on_listenfd(int listener) {
- struct web_client *w;
-
- w = web_client_get_from_cache_or_allocate();
- w->ifd = w->ofd = accept_socket(listener, SOCK_NONBLOCK, w->client_ip, sizeof(w->client_ip), w->client_port, sizeof(w->client_port), web_allow_connections_from);
-
- if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
- if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
-
- if (w->ifd == -1) {
- if(errno == EPERM)
- log_connection(w, "ACCESS DENIED");
- else {
- log_connection(w, "CONNECTION FAILED");
- error("%llu: Failed to accept new incoming connection.", w->id);
- }
-
- web_client_release(w);
- return NULL;
- }
-
- web_client_initialize_connection(w);
- return(w);
-}
-
-
-// --------------------------------------------------------------------------------------
-// the thread of a single client - for the MULTI-THREADED web server
-
-// 1. waits for input and output, using async I/O
-// 2. it processes HTTP requests
-// 3. it generates HTTP responses
-// 4. it copies data from input to output if mode is FILECOPY
-
-int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS;
-int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST;
-
-static void multi_threaded_web_client_worker_main_cleanup(void *ptr) {
- struct web_client *w = ptr;
- WEB_CLIENT_IS_DEAD(w);
- w->running = 0;
-}
-
-static void *multi_threaded_web_client_worker_main(void *ptr) {
- netdata_thread_cleanup_push(multi_threaded_web_client_worker_main_cleanup, ptr);
-
- struct web_client *w = ptr;
- w->running = 1;
-
- struct pollfd fds[2], *ifd, *ofd;
- int retval, timeout_ms;
- nfds_t fdmax = 0;
-
- while(!netdata_exit) {
- if(unlikely(web_client_check_dead(w))) {
- debug(D_WEB_CLIENT, "%llu: client is dead.", w->id);
- break;
- }
- else if(unlikely(!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))) {
- debug(D_WEB_CLIENT, "%llu: client is not set for neither receiving nor sending data.", w->id);
- break;
- }
-
- if(unlikely(w->ifd < 0 || w->ofd < 0)) {
- error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd", w->id, w->ifd, w->ofd);
- break;
- }
-
- if(w->ifd == w->ofd) {
- fds[0].fd = w->ifd;
- fds[0].events = 0;
- fds[0].revents = 0;
-
- if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN;
- if(web_client_has_wait_send(w)) fds[0].events |= POLLOUT;
-
- fds[1].fd = -1;
- fds[1].events = 0;
- fds[1].revents = 0;
-
- ifd = ofd = &fds[0];
-
- fdmax = 1;
- }
- else {
- fds[0].fd = w->ifd;
- fds[0].events = 0;
- fds[0].revents = 0;
- if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN;
- ifd = &fds[0];
-
- fds[1].fd = w->ofd;
- fds[1].events = 0;
- fds[1].revents = 0;
- if(web_client_has_wait_send(w)) fds[1].events |= POLLOUT;
- ofd = &fds[1];
-
- fdmax = 2;
- }
-
- debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":"");
- errno = 0;
- timeout_ms = web_client_timeout * 1000;
- retval = poll(fds, fdmax, timeout_ms);
-
- if(unlikely(netdata_exit)) break;
-
- if(unlikely(retval == -1)) {
- if(errno == EAGAIN || errno == EINTR) {
- debug(D_WEB_CLIENT, "%llu: EAGAIN received.", w->id);
- continue;
- }
-
- debug(D_WEB_CLIENT, "%llu: LISTENER: poll() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
- break;
- }
- else if(unlikely(!retval)) {
- debug(D_WEB_CLIENT, "%llu: Timeout while waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":"");
- break;
- }
-
- if(unlikely(netdata_exit)) break;
-
- int used = 0;
- if(web_client_has_wait_send(w) && ofd->revents & POLLOUT) {
- used++;
- if(web_client_send(w) < 0) {
- debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
- break;
- }
- }
-
- if(unlikely(netdata_exit)) break;
-
- if(web_client_has_wait_receive(w) && (ifd->revents & POLLIN || ifd->revents & POLLPRI)) {
- used++;
- if(web_client_receive(w) < 0) {
- debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
- break;
- }
-
- if(w->mode == WEB_CLIENT_MODE_NORMAL) {
- debug(D_WEB_CLIENT, "%llu: Attempting to process received data.", w->id);
- web_client_process_request(w);
-
- // if the sockets are closed, may have transferred this client
- // to plugins.d
- if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM))
- break;
- }
- }
-
- if(unlikely(!used)) {
- debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on socket.", w->id);
- break;
- }
- }
-
- if(w->mode != WEB_CLIENT_MODE_STREAM)
- log_connection(w, "DISCONNECTED");
-
- web_client_request_done(w);
-
- debug(D_WEB_CLIENT, "%llu: done...", w->id);
-
- // close the sockets/files now
- // to free file descriptors
- if(w->ifd == w->ofd) {
- if(w->ifd != -1) close(w->ifd);
- }
- else {
- if(w->ifd != -1) close(w->ifd);
- if(w->ofd != -1) close(w->ofd);
- }
- w->ifd = -1;
- w->ofd = -1;
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
-
-// --------------------------------------------------------------------------------------
-// the main socket listener - MULTI-THREADED
-
-// 1. it accepts new incoming requests on our port
-// 2. creates a new web_client for each connection received
-// 3. spawns a new netdata_thread to serve the client (this is optimal for keep-alive clients)
-// 4. cleans up old web_clients that their netdata_threads have been exited
-
-static void web_client_multi_threaded_web_server_release_clients(void) {
- struct web_client *w;
- for(w = web_clients_cache.used; w ; ) {
- if(unlikely(!w->running && web_client_check_dead(w))) {
- struct web_client *t = w->next;
- web_client_release(w);
- w = t;
- }
- else
- w = w->next;
- }
-}
-
-static void web_client_multi_threaded_web_server_stop_all_threads(void) {
- struct web_client *w;
-
- int found = 1, max = 2 * USEC_PER_SEC, step = 50000;
- for(w = web_clients_cache.used; w ; w = w->next) {
- if(w->running) {
- found++;
- info("stopping web client %s, id %llu", w->client_ip, w->id);
- netdata_thread_cancel(w->thread);
- }
- }
-
- while(found && max > 0) {
- max -= step;
- info("Waiting %d web threads to finish...", found);
- sleep_usec(step);
- found = 0;
- for(w = web_clients_cache.used; w ; w = w->next)
- if(w->running) found++;
- }
-
- if(found)
- error("%d web threads are taking too long to finish. Giving up.", found);
-}
-
-static struct pollfd *socket_listen_main_multi_threaded_fds = NULL;
-
-static void socket_listen_main_multi_threaded_cleanup(void *data) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- info("cleaning up...");
-
- info("releasing allocated memory...");
- freez(socket_listen_main_multi_threaded_fds);
-
- info("closing all sockets...");
- listen_sockets_close(&api_sockets);
-
- info("stopping all running web server threads...");
- web_client_multi_threaded_web_server_stop_all_threads();
-
- info("freeing web clients cache...");
- web_client_cache_destroy();
-
- info("cleanup completed.");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
-}
-
-#define CLEANUP_EVERY_EVENTS 60
-void *socket_listen_main_multi_threaded(void *ptr) {
- netdata_thread_cleanup_push(socket_listen_main_multi_threaded_cleanup, ptr);
-
- web_server_mode = WEB_SERVER_MODE_MULTI_THREADED;
- web_server_is_multithreaded = 1;
-
- struct web_client *w;
- int retval, counter = 0;
-
- if(!api_sockets.opened)
- fatal("LISTENER: No sockets to listen to.");
-
- socket_listen_main_multi_threaded_fds = callocz(sizeof(struct pollfd), api_sockets.opened);
-
- size_t i;
- for(i = 0; i < api_sockets.opened ;i++) {
- socket_listen_main_multi_threaded_fds[i].fd = api_sockets.fds[i];
- socket_listen_main_multi_threaded_fds[i].events = POLLIN;
- socket_listen_main_multi_threaded_fds[i].revents = 0;
-
- info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN");
- }
-
- int timeout_ms = 1 * 1000;
-
- while(!netdata_exit) {
-
- // debug(D_WEB_CLIENT, "LISTENER: Waiting...");
- retval = poll(socket_listen_main_multi_threaded_fds, api_sockets.opened, timeout_ms);
-
- if(unlikely(retval == -1)) {
- error("LISTENER: poll() failed.");
- continue;
- }
- else if(unlikely(!retval)) {
- debug(D_WEB_CLIENT, "LISTENER: poll() timeout.");
- counter++;
- continue;
- }
-
- for(i = 0 ; i < api_sockets.opened ; i++) {
- short int revents = socket_listen_main_multi_threaded_fds[i].revents;
-
- // check for new incoming connections
- if(revents & POLLIN || revents & POLLPRI) {
- socket_listen_main_multi_threaded_fds[i].revents = 0;
-
- w = web_client_create_on_listenfd(socket_listen_main_multi_threaded_fds[i].fd);
- if(unlikely(!w)) {
- // no need for error log - web_client_create_on_listenfd already logged the error
- continue;
- }
-
- if(api_sockets.fds_families[i] == AF_UNIX)
- web_client_set_unix(w);
- else
- web_client_set_tcp(w);
-
- char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "WEB_CLIENT[%llu,[%s]:%s]", w->id, w->client_ip, w->client_port);
-
- w->running = 1;
- if(netdata_thread_create(&w->thread, tag, NETDATA_THREAD_OPTION_DONT_LOG, multi_threaded_web_client_worker_main, w) != 0) {
- w->running = 0;
- web_client_release(w);
- }
- }
- }
-
- counter++;
- if(counter > CLEANUP_EVERY_EVENTS) {
- counter = 0;
- web_client_multi_threaded_web_server_release_clients();
- }
- }
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
-
-
-// --------------------------------------------------------------------------------------
-// the main socket listener - SINGLE-THREADED
-
-struct web_client *single_threaded_clients[FD_SETSIZE];
-
-static inline int single_threaded_link_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds, int *max) {
- if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) {
- return 1;
- }
-
- if(unlikely(w->ifd < 0 || w->ifd >= (int)FD_SETSIZE || w->ofd < 0 || w->ofd >= (int)FD_SETSIZE)) {
- error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, (int)FD_SETSIZE);
- return 1;
- }
-
- FD_SET(w->ifd, efds);
- if(unlikely(*max < w->ifd)) *max = w->ifd;
-
- if(unlikely(w->ifd != w->ofd)) {
- if(*max < w->ofd) *max = w->ofd;
- FD_SET(w->ofd, efds);
- }
-
- if(web_client_has_wait_receive(w)) FD_SET(w->ifd, ifds);
- if(web_client_has_wait_send(w)) FD_SET(w->ofd, ofds);
-
- single_threaded_clients[w->ifd] = w;
- single_threaded_clients[w->ofd] = w;
-
- return 0;
-}
-
-static inline int single_threaded_unlink_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds) {
- FD_CLR(w->ifd, efds);
- if(unlikely(w->ifd != w->ofd)) FD_CLR(w->ofd, efds);
-
- if(web_client_has_wait_receive(w)) FD_CLR(w->ifd, ifds);
- if(web_client_has_wait_send(w)) FD_CLR(w->ofd, ofds);
-
- single_threaded_clients[w->ifd] = NULL;
- single_threaded_clients[w->ofd] = NULL;
-
- if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) {
- return 1;
- }
-
- return 0;
-}
-
-static void socket_listen_main_single_threaded_cleanup(void *data) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- info("closing all sockets...");
- listen_sockets_close(&api_sockets);
-
- info("freeing web clients cache...");
- web_client_cache_destroy();
-
- info("cleanup completed.");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
-}
-
-void *socket_listen_main_single_threaded(void *ptr) {
- netdata_thread_cleanup_push(socket_listen_main_single_threaded_cleanup, ptr);
- web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED;
- web_server_is_multithreaded = 0;
-
- struct web_client *w;
-
- if(!api_sockets.opened)
- fatal("LISTENER: no listen sockets available.");
-
- size_t i;
- for(i = 0; i < (size_t)FD_SETSIZE ; i++)
- single_threaded_clients[i] = NULL;
-
- fd_set ifds, ofds, efds, rifds, rofds, refds;
- FD_ZERO (&ifds);
- FD_ZERO (&ofds);
- FD_ZERO (&efds);
- int fdmax = 0;
-
- for(i = 0; i < api_sockets.opened ; i++) {
- if (api_sockets.fds[i] < 0 || api_sockets.fds[i] >= (int)FD_SETSIZE)
- fatal("LISTENER: Listen socket %d is not ready, or invalid.", api_sockets.fds[i]);
-
- info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN");
-
- FD_SET(api_sockets.fds[i], &ifds);
- FD_SET(api_sockets.fds[i], &efds);
- if(fdmax < api_sockets.fds[i])
- fdmax = api_sockets.fds[i];
- }
-
- while(!netdata_exit) {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server waiting (fdmax = %d)...", fdmax);
-
- struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
- rifds = ifds;
- rofds = ofds;
- refds = efds;
- int retval = select(fdmax+1, &rifds, &rofds, &refds, &tv);
-
- if(unlikely(retval == -1)) {
- error("LISTENER: select() failed.");
- continue;
- }
- else if(likely(retval)) {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: got something.");
-
- for(i = 0; i < api_sockets.opened ; i++) {
- if (FD_ISSET(api_sockets.fds[i], &rifds)) {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
- w = web_client_create_on_listenfd(api_sockets.fds[i]);
- if(unlikely(!w))
- continue;
-
- if(api_sockets.fds_families[i] == AF_UNIX)
- web_client_set_unix(w);
- else
- web_client_set_tcp(w);
-
- if (single_threaded_link_client(w, &ifds, &ofds, &ifds, &fdmax) != 0) {
- web_client_release(w);
- }
- }
- }
-
- for(i = 0 ; i <= (size_t)fdmax ; i++) {
- if(likely(!FD_ISSET(i, &rifds) && !FD_ISSET(i, &rofds) && !FD_ISSET(i, &refds)))
- continue;
-
- w = single_threaded_clients[i];
- if(unlikely(!w)) {
- // error("no client on slot %zu", i);
- continue;
- }
-
- if(unlikely(single_threaded_unlink_client(w, &ifds, &ofds, &efds) != 0)) {
- // error("failed to unlink client %zu", i);
- web_client_release(w);
- continue;
- }
-
- if (unlikely(FD_ISSET(w->ifd, &refds) || FD_ISSET(w->ofd, &refds))) {
- // error("no input on client %zu", i);
- web_client_release(w);
- continue;
- }
-
- if (unlikely(web_client_has_wait_receive(w) && FD_ISSET(w->ifd, &rifds))) {
- if (unlikely(web_client_receive(w) < 0)) {
- // error("cannot read from client %zu", i);
- web_client_release(w);
- continue;
- }
-
- if (w->mode != WEB_CLIENT_MODE_FILECOPY) {
- debug(D_WEB_CLIENT, "%llu: Processing received data.", w->id);
- web_client_process_request(w);
- }
- }
-
- if (unlikely(web_client_has_wait_send(w) && FD_ISSET(w->ofd, &rofds))) {
- if (unlikely(web_client_send(w) < 0)) {
- // error("cannot send data to client %zu", i);
- debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
- web_client_release(w);
- continue;
- }
- }
-
- if(unlikely(single_threaded_link_client(w, &ifds, &ofds, &efds, &fdmax) != 0)) {
- // error("failed to link client %zu", i);
- web_client_release(w);
- }
- }
- }
- else {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server timeout.");
- }
- }
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
-
-
-// --------------------------------------------------------------------------------------
-// the main socket listener - STATIC-THREADED
-
-struct web_server_static_threaded_worker {
- netdata_thread_t thread;
-
- int id;
- int running;
-
- size_t max_sockets;
-
- volatile size_t connected;
- volatile size_t disconnected;
- volatile size_t receptions;
- volatile size_t sends;
- volatile size_t max_concurrent;
-
- volatile size_t files_read;
- volatile size_t file_reads;
-};
-
-static long long static_threaded_workers_count = 1;
-static struct web_server_static_threaded_worker *static_workers_private_data = NULL;
-static __thread struct web_server_static_threaded_worker *worker_private = NULL;
-
-// ----------------------------------------------------------------------------
-
-static inline int web_server_check_client_status(struct web_client *w) {
- if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
- return -1;
-
- return 0;
-}
-
-// ----------------------------------------------------------------------------
-// web server files
-
-static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) {
- struct web_client *w = (struct web_client *)data;
-
- worker_private->files_read++;
-
- debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd);
- *events = POLLIN;
- pi->data = w;
- return w;
-}
-
-static void web_werver_file_del_callback(POLLINFO *pi) {
- struct web_client *w = (struct web_client *)pi->data;
- debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd);
-
- w->pollinfo_filecopy_slot = 0;
-
- if(unlikely(!w->pollinfo_slot)) {
- debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
- web_client_release(w);
- }
-}
-
-static int web_server_file_read_callback(POLLINFO *pi, short int *events) {
- struct web_client *w = (struct web_client *)pi->data;
-
- // if there is no POLLINFO linked to this, it means the client disconnected
- // stop the file reading too
- if(unlikely(!w->pollinfo_slot)) {
- debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd);
- return -1;
- }
-
- if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) {
- debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd);
- return -1;
- }
-
- debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd);
-
- worker_private->file_reads++;
- ssize_t ret = unlikely(web_client_read_file(w));
-
- if(likely(web_client_has_wait_send(w))) {
- POLLJOB *p = pi->p; // our POLLJOB
- POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket
-
- debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd);
- p->fds[wpi->slot].events |= POLLOUT;
- }
-
- if(unlikely(ret <= 0 || w->ifd == w->ofd)) {
- debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd);
- return -1;
- }
-
- *events = POLLIN;
- return 0;
-}
-
-static int web_server_file_write_callback(POLLINFO *pi, short int *events) {
- (void)pi;
- (void)events;
-
- error("Writing to web files is not supported!");
-
- return -1;
-}
-
-// ----------------------------------------------------------------------------
-// web server clients
-
-static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) {
- (void)data;
-
- worker_private->connected++;
-
- size_t concurrent = worker_private->connected - worker_private->disconnected;
- if(unlikely(concurrent > worker_private->max_concurrent))
- worker_private->max_concurrent = concurrent;
-
- *events = POLLIN;
-
- debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd);
- struct web_client *w = web_client_create_on_fd(pi->fd, pi->client_ip, pi->client_port);
- w->pollinfo_slot = pi->slot;
-
- if(unlikely(pi->socktype == AF_UNIX))
- web_client_set_unix(w);
- else
- web_client_set_tcp(w);
-
- debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd);
- return w;
-}
-
-// TCP client disconnected
-static void web_server_del_callback(POLLINFO *pi) {
- worker_private->disconnected++;
-
- struct web_client *w = (struct web_client *)pi->data;
-
- w->pollinfo_slot = 0;
- if(unlikely(w->pollinfo_filecopy_slot)) {
- POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket
- debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd);
- }
- else {
- if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET))
- pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
-
- debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
- web_client_release(w);
- }
-}
-
-static int web_server_rcv_callback(POLLINFO *pi, short int *events) {
- worker_private->receptions++;
-
- struct web_client *w = (struct web_client *)pi->data;
- int fd = pi->fd;
-
- if(unlikely(web_client_receive(w) < 0))
- return -1;
-
- debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd);
- web_client_process_request(w);
-
- if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
- if(w->pollinfo_filecopy_slot == 0) {
- debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd);
-
- if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) {
- // add a new socket to poll_events, with the same
- debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd);
-
- POLLINFO *fpi = poll_add_fd(
- pi->p
- , w->ifd
- , 0
- , POLLINFO_FLAG_CLIENT_SOCKET
- , "FILENAME"
- , ""
- , web_server_file_add_callback
- , web_werver_file_del_callback
- , web_server_file_read_callback
- , web_server_file_write_callback
- , (void *) w
- );
-
- if(fpi)
- w->pollinfo_filecopy_slot = fpi->slot;
- else {
- error("Failed to add filecopy fd. Closing client.");
- return -1;
- }
- }
- }
- }
- else {
- if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
- *events |= POLLIN;
- }
-
- if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
- *events |= POLLOUT;
-
- return web_server_check_client_status(w);
-}
-
-static int web_server_snd_callback(POLLINFO *pi, short int *events) {
- worker_private->sends++;
-
- struct web_client *w = (struct web_client *)pi->data;
- int fd = pi->fd;
-
- debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
-
- if(unlikely(web_client_send(w) < 0))
- return -1;
-
- if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
- *events |= POLLIN;
-
- if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
- *events |= POLLOUT;
-
- return web_server_check_client_status(w);
-}
-
-static void web_server_tmr_callback(void *timer_data) {
- worker_private = (struct web_server_static_threaded_worker *)timer_data;
-
- static __thread RRDSET *st = NULL;
- static __thread RRDDIM *rd_user = NULL, *rd_system = NULL;
-
- if(unlikely(!st)) {
- char id[100 + 1];
- char title[100 + 1];
-
- snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1);
- snprintfz(title, 100, "NetData web server thread No %d CPU usage", worker_private->id + 1);
-
- st = rrdset_create_localhost(
- "netdata"
- , id
- , NULL
- , "web"
- , "netdata.web_cpu"
- , title
- , "milliseconds/s"
- , "web"
- , "stats"
- , 132000 + worker_private->id
- , default_rrd_update_every
- , RRDSET_TYPE_STACKED
- );
-
- rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- }
- else
- rrdset_next(st);
-
- struct rusage rusage;
- getrusage(RUSAGE_THREAD, &rusage);
- rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec);
- rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec);
- rrdset_done(st);
-}
-
-// ----------------------------------------------------------------------------
-// web server worker thread
-
-static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
- worker_private = (struct web_server_static_threaded_worker *)ptr;
-
- info("freeing local web clients cache...");
- web_client_cache_destroy();
-
- info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
- worker_private->connected,
- worker_private->disconnected,
- worker_private->max_concurrent,
- worker_private->receptions,
- worker_private->sends
- );
-
- worker_private->running = 0;
-}
-
-void *socket_listen_main_static_threaded_worker(void *ptr) {
- worker_private = (struct web_server_static_threaded_worker *)ptr;
- worker_private->running = 1;
-
- netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr);
-
- poll_events(&api_sockets
- , web_server_add_callback
- , web_server_del_callback
- , web_server_rcv_callback
- , web_server_snd_callback
- , web_server_tmr_callback
- , web_allow_connections_from
- , NULL
- , web_client_first_request_timeout
- , web_client_timeout
- , default_rrd_update_every * 1000 // timer_milliseconds
- , ptr // timer_data
- , worker_private->max_sockets
- );
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}
-
-
-// ----------------------------------------------------------------------------
-// web server main thread - also becomes a worker
-
-static void socket_listen_main_static_threaded_cleanup(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- int i, found = 0, max = 2 * USEC_PER_SEC, step = 50000;
-
- // we start from 1, - 0 is self
- for(i = 1; i < static_threaded_workers_count; i++) {
- if(static_workers_private_data[i].running) {
- found++;
- info("stopping worker %d", i + 1);
- netdata_thread_cancel(static_workers_private_data[i].thread);
- }
- else
- info("found stopped worker %d", i + 1);
- }
-
- while(found && max > 0) {
- max -= step;
- info("Waiting %d static web threads to finish...", found);
- sleep_usec(step);
- found = 0;
-
- // we start from 1, - 0 is self
- for(i = 1; i < static_threaded_workers_count; i++) {
- if (static_workers_private_data[i].running)
- found++;
- }
- }
-
- if(found)
- error("%d static web threads are taking too long to finish. Giving up.", found);
-
- info("closing all web server sockets...");
- listen_sockets_close(&api_sockets);
-
- info("all static web threads stopped.");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
-}
-
-void *socket_listen_main_static_threaded(void *ptr) {
- netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr);
- web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
-
- if(!api_sockets.opened)
- fatal("LISTENER: no listen sockets available.");
-
- // 6 threads is the optimal value
- // since 6 are the parallel connections browsers will do
- // so, if the machine has more CPUs, avoid using resources unnecessarily
- int def_thread_count = (processors > 6)?6:processors;
-
- static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count);
- if(static_threaded_workers_count < 1) static_threaded_workers_count = 1;
-
- size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2));
-
- static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker));
-
- web_server_is_multithreaded = (static_threaded_workers_count > 1);
-
- int i;
- for(i = 1; i < static_threaded_workers_count; i++) {
- static_workers_private_data[i].id = i;
- static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count;
-
- char tag[50 + 1];
- snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1);
-
- info("starting worker %d", i+1);
- netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]);
- }
-
- // and the main one
- static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count;
- socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]);
-
- netdata_thread_cleanup_pop(1);
- return NULL;
-}