summaryrefslogtreecommitdiffstats
path: root/src/web_server.c
diff options
context:
space:
mode:
authorFederico Ceratto <federico.ceratto@gmail.com>2018-03-27 21:28:21 +0000
committerFederico Ceratto <federico.ceratto@gmail.com>2018-03-27 21:28:21 +0000
commitd4dd00f58a502c9ca4b63e36ce6bc7a9945dc63c (patch)
treefaac99f51f182bb8c0a03e95e393d421ac9ddf42 /src/web_server.c
parentNew upstream version 1.9.0+dfsg (diff)
downloadnetdata-d4dd00f58a502c9ca4b63e36ce6bc7a9945dc63c.tar.xz
netdata-d4dd00f58a502c9ca4b63e36ce6bc7a9945dc63c.zip
New upstream version 1.10.0+dfsgupstream/1.10.0+dfsg
Diffstat (limited to 'src/web_server.c')
-rw-r--r--src/web_server.c1184
1 files changed, 979 insertions, 205 deletions
diff --git a/src/web_server.c b/src/web_server.c
index d231cbb5..31c54641 100644
--- a/src/web_server.c
+++ b/src/web_server.c
@@ -1,50 +1,12 @@
#include "common.h"
-static LISTEN_SOCKETS api_sockets = {
- .config_section = CONFIG_SECTION_WEB,
- .default_bind_to = "*",
- .default_port = API_LISTEN_PORT,
- .backlog = API_LISTEN_BACKLOG
-};
+// this file includes 3 web servers:
+//
+// 1. single-threaded, based on select()
+// 2. multi-threaded, based on poll() that spawns threads to handle the requests, based on select()
+// 3. static-threaded, based on poll() using a fixed number of threads (configured at netdata.conf)
-WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_MULTI_THREADED;
-
-#ifdef NETDATA_INTERNAL_CHECKS
-static void log_allocations(void)
-{
-#ifdef HAVE_C_MALLINFO
- static int heap = 0, used = 0, mmap = 0;
-
- struct mallinfo mi;
-
- mi = mallinfo();
- if(mi.uordblks > used) {
- int clients = 0;
- struct web_client *w;
- for(w = web_clients; w ; w = w->next) clients++;
-
- info("Allocated memory: used %d KB (+%d B), mmap %d KB (+%d B), heap %d KB (+%d B). %d web clients connected.",
- mi.uordblks / 1024,
- mi.uordblks - used,
- mi.hblkhd / 1024,
- mi.hblkhd - mmap,
- mi.arena / 1024,
- mi.arena - heap,
- clients);
-
- used = mi.uordblks;
- heap = mi.arena;
- mmap = mi.hblkhd;
- }
-#else /* ! HAVE_C_MALLINFO */
- ;
-#endif /* ! HAVE_C_MALLINFO */
-
-#ifdef has_jemalloc
- malloc_stats_print(NULL, NULL, NULL);
-#endif
-}
-#endif /* NETDATA_INTERNAL_CHECKS */
+WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
// --------------------------------------------------------------------------------------
@@ -53,6 +15,8 @@ WEB_SERVER_MODE web_server_mode_id(const char *mode) {
return WEB_SERVER_MODE_NONE;
else if(!strcmp(mode, "single") || !strcmp(mode, "single-threaded"))
return WEB_SERVER_MODE_SINGLE_THREADED;
+ else if(!strcmp(mode, "static") || !strcmp(mode, "static-threaded"))
+ return WEB_SERVER_MODE_STATIC_THREADED;
else // if(!strcmp(mode, "multi") || !strcmp(mode, "multi-threaded"))
return WEB_SERVER_MODE_MULTI_THREADED;
}
@@ -65,6 +29,9 @@ const char *web_server_mode_name(WEB_SERVER_MODE id) {
case WEB_SERVER_MODE_SINGLE_THREADED:
return "single-threaded";
+ case WEB_SERVER_MODE_STATIC_THREADED:
+ return "static-threaded";
+
default:
case WEB_SERVER_MODE_MULTI_THREADED:
return "multi-threaded";
@@ -72,6 +39,14 @@ const char *web_server_mode_name(WEB_SERVER_MODE id) {
}
// --------------------------------------------------------------------------------------
+// API sockets
+
+static LISTEN_SOCKETS api_sockets = {
+ .config_section = CONFIG_SECTION_WEB,
+ .default_bind_to = "*",
+ .default_port = API_LISTEN_PORT,
+ .backlog = API_LISTEN_BACKLOG
+};
int api_listen_sockets_setup(void) {
int socks = listen_sockets_setup(&api_sockets);
@@ -82,89 +57,622 @@ int api_listen_sockets_setup(void) {
return socks;
}
+
// --------------------------------------------------------------------------------------
-// the main socket listener
+// access lists
-static inline void cleanup_web_clients(void) {
- struct web_client *w;
+SIMPLE_PATTERN *web_allow_connections_from = NULL;
+SIMPLE_PATTERN *web_allow_streaming_from = NULL;
+SIMPLE_PATTERN *web_allow_netdataconf_from = NULL;
+
+// WEB_CLIENT_ACL
+SIMPLE_PATTERN *web_allow_dashboard_from = NULL;
+SIMPLE_PATTERN *web_allow_registry_from = NULL;
+SIMPLE_PATTERN *web_allow_badges_from = NULL;
+
+static void web_client_update_acl_matches(struct web_client *w) {
+ w->acl = WEB_CLIENT_ACL_NONE;
+
+ if(!web_allow_dashboard_from || simple_pattern_matches(web_allow_dashboard_from, w->client_ip))
+ w->acl |= WEB_CLIENT_ACL_DASHBOARD;
+
+ if(!web_allow_registry_from || simple_pattern_matches(web_allow_registry_from, w->client_ip))
+ w->acl |= WEB_CLIENT_ACL_REGISTRY;
+
+ if(!web_allow_badges_from || simple_pattern_matches(web_allow_badges_from, w->client_ip))
+ w->acl |= WEB_CLIENT_ACL_BADGE;
+}
+
+
+// --------------------------------------------------------------------------------------
+
+static void log_connection(struct web_client *w, const char *msg) {
+ log_access("%llu: %d '[%s]:%s' '%s'", w->id, gettid(), w->client_ip, w->client_port, msg);
+}
+
+// ----------------------------------------------------------------------------
+// allocate and free web_clients
+
+static void web_client_zero(struct web_client *w) {
+ // zero everything about it - but keep the buffers
+
+ // remember the pointers to the buffers
+ BUFFER *b1 = w->response.data;
+ BUFFER *b2 = w->response.header;
+ BUFFER *b3 = w->response.header_output;
+
+ // empty the buffers
+ buffer_flush(b1);
+ buffer_flush(b2);
+ buffer_flush(b3);
+
+ freez(w->user_agent);
+
+ // zero everything
+ memset(w, 0, sizeof(struct web_client));
+
+ // restore the pointers of the buffers
+ w->response.data = b1;
+ w->response.header = b2;
+ w->response.header_output = b3;
+}
+
+static void web_client_free(struct web_client *w) {
+ buffer_free(w->response.header_output);
+ buffer_free(w->response.header);
+ buffer_free(w->response.data);
+ freez(w->user_agent);
+ freez(w);
+}
+
+static struct web_client *web_client_alloc(void) {
+ struct web_client *w = callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ return w;
+}
+
+// ----------------------------------------------------------------------------
+// web clients caching
+
+// When clients connect and disconnect, avoid allocating and releasing memory.
+// Instead, when new clients get connected, reuse any memory previously allocated
+// for serving web clients that are now disconnected.
+
+// The size of the cache is adaptive. It caches the structures of 2x
+// the number of currently connected clients.
+
+// Comments per server:
+// SINGLE-THREADED : 1 cache is maintained
+// MULTI-THREADED : 1 cache is maintained
+// STATIC-THREADED : 1 cache for each thred of the web server
+
+struct clients_cache {
+ pid_t pid;
+
+ struct web_client *used; // the structures of the currently connected clients
+ size_t used_count; // the count the currently connected clients
+
+ struct web_client *avail; // the cached structures, available for future clients
+ size_t avail_count; // the number of cached structures
+
+ size_t reused; // the number of re-uses
+ size_t allocated; // the number of allocations
+};
+
+static __thread struct clients_cache web_clients_cache = {
+ .pid = 0,
+ .used = NULL,
+ .used_count = 0,
+ .avail = NULL,
+ .avail_count = 0,
+ .allocated = 0,
+ .reused = 0
+};
+
+static inline void web_client_cache_verify(int force) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ static __thread size_t count = 0;
+ count++;
+
+ if(unlikely(force || count > 1000)) {
+ count = 0;
+
+ struct web_client *w;
+ size_t used = 0, avail = 0;
+ for(w = web_clients_cache.used; w ; w = w->next) used++;
+ for(w = web_clients_cache.avail; w ; w = w->next) avail++;
+
+ info("web_client_cache has %zu (%zu) used and %zu (%zu) available clients, allocated %zu, reused %zu (hit %zu%%)."
+ , used, web_clients_cache.used_count
+ , avail, web_clients_cache.avail_count
+ , web_clients_cache.allocated
+ , web_clients_cache.reused
+ , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
+ );
+ }
+#else
+ if(unlikely(force)) {
+ info("web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)."
+ , web_clients_cache.used_count
+ , web_clients_cache.avail_count
+ , web_clients_cache.allocated
+ , web_clients_cache.reused
+ , (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
+ );
+ }
+#endif
+}
+
+// destroy the cache and free all the memory it uses
+static void web_client_cache_destroy(void) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
+ error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
+
+ web_client_cache_verify(1);
+#endif
+
+ netdata_thread_disable_cancelability();
+
+ struct web_client *w, *t;
+
+ w = web_clients_cache.used;
+ while(w) {
+ t = w;
+ w = w->next;
+ web_client_free(t);
+ }
+ web_clients_cache.used = NULL;
+ web_clients_cache.used_count = 0;
+
+ w = web_clients_cache.avail;
+ while(w) {
+ t = w;
+ w = w->next;
+ web_client_free(t);
+ }
+ web_clients_cache.avail = NULL;
+ web_clients_cache.avail_count = 0;
+
+ netdata_thread_enable_cancelability();
+}
+
+static struct web_client *web_client_get_from_cache_or_allocate() {
- for (w = web_clients; w;) {
- if (web_client_check_obsolete(w)) {
- debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
- // pthread_cancel(w->thread);
- // pthread_join(w->thread, NULL);
- w = web_client_free(w);
#ifdef NETDATA_INTERNAL_CHECKS
- log_allocations();
+ if(unlikely(web_clients_cache.pid == 0))
+ web_clients_cache.pid = gettid();
+
+ if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
+ error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
#endif
+
+ netdata_thread_disable_cancelability();
+
+ struct web_client *w = web_clients_cache.avail;
+
+ if(w) {
+ // get it from avail
+ if (w == web_clients_cache.avail) web_clients_cache.avail = w->next;
+ if(w->prev) w->prev->next = w->next;
+ if(w->next) w->next->prev = w->prev;
+ web_clients_cache.avail_count--;
+ web_client_zero(w);
+ web_clients_cache.reused++;
+ }
+ else {
+ // allocate it
+ w = web_client_alloc();
+ web_clients_cache.allocated++;
+ }
+
+ // link it to used web clients
+ if (web_clients_cache.used) web_clients_cache.used->prev = w;
+ w->next = web_clients_cache.used;
+ w->prev = NULL;
+ web_clients_cache.used = w;
+ web_clients_cache.used_count++;
+
+ // initialize it
+ w->id = web_client_connected();
+ w->mode = WEB_CLIENT_MODE_NORMAL;
+
+ netdata_thread_enable_cancelability();
+
+ return w;
+}
+
+static void web_client_release(struct web_client *w) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
+ error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
+
+ if(unlikely(w->running))
+ error("%llu: releasing web client from %s port %s, but it still running.", w->id, w->client_ip, w->client_port);
+#endif
+
+ debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port);
+
+ log_connection(w, "DISCONNECTED");
+ web_client_request_done(w);
+ web_client_disconnected();
+
+ netdata_thread_disable_cancelability();
+
+ if(web_server_mode != WEB_SERVER_MODE_STATIC_THREADED) {
+ if (w->ifd != -1) close(w->ifd);
+ if (w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
+ w->ifd = w->ofd = -1;
+ }
+
+ // unlink it from the used
+ if (w == web_clients_cache.used) web_clients_cache.used = w->next;
+ if(w->prev) w->prev->next = w->next;
+ if(w->next) w->next->prev = w->prev;
+ web_clients_cache.used_count--;
+
+ if(web_clients_cache.avail_count >= 2 * web_clients_cache.used_count) {
+ // we have too many of them - free it
+ web_client_free(w);
+ }
+ else {
+ // link it to the avail
+ if (web_clients_cache.avail) web_clients_cache.avail->prev = w;
+ w->next = web_clients_cache.avail;
+ w->prev = NULL;
+ web_clients_cache.avail = w;
+ web_clients_cache.avail_count++;
+ }
+
+ netdata_thread_enable_cancelability();
+}
+
+
+// ----------------------------------------------------------------------------
+// high level web clients connection management
+
+static void web_client_initialize_connection(struct web_client *w) {
+ int flag = 1;
+ if(setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0)
+ error("%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd);
+
+ flag = 1;
+ if(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0)
+ error("%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd);
+
+ web_client_update_acl_matches(w);
+
+ w->origin[0] = '*'; w->origin[1] = '\0';
+ w->cookie1[0] = '\0'; w->cookie2[0] = '\0';
+ freez(w->user_agent); w->user_agent = NULL;
+
+ web_client_enable_wait_receive(w);
+
+ log_connection(w, "CONNECTED");
+
+ web_client_cache_verify(0);
+}
+
+static struct web_client *web_client_create_on_fd(int fd, const char *client_ip, const char *client_port) {
+ struct web_client *w;
+
+ w = web_client_get_from_cache_or_allocate();
+ w->ifd = w->ofd = fd;
+
+ strncpyz(w->client_ip, client_ip, sizeof(w->client_ip) - 1);
+ strncpyz(w->client_port, client_port, sizeof(w->client_port) - 1);
+
+ if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
+ if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
+
+ web_client_initialize_connection(w);
+ return(w);
+}
+
+static struct web_client *web_client_create_on_listenfd(int listener) {
+ struct web_client *w;
+
+ w = web_client_get_from_cache_or_allocate();
+ w->ifd = w->ofd = accept_socket(listener, SOCK_NONBLOCK, w->client_ip, sizeof(w->client_ip), w->client_port, sizeof(w->client_port), web_allow_connections_from);
+
+ if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
+ if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
+
+ if (w->ifd == -1) {
+ if(errno == EPERM)
+ log_connection(w, "ACCESS DENIED");
+ else {
+ log_connection(w, "CONNECTION FAILED");
+ error("%llu: Failed to accept new incoming connection.", w->id);
}
- else w = w->next;
+
+ web_client_release(w);
+ return NULL;
}
+
+ web_client_initialize_connection(w);
+ return(w);
+}
+
+
+// --------------------------------------------------------------------------------------
+// the thread of a single client - for the MULTI-THREADED web server
+
+// 1. waits for input and output, using async I/O
+// 2. it processes HTTP requests
+// 3. it generates HTTP responses
+// 4. it copies data from input to output if mode is FILECOPY
+
+int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS;
+int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST;
+
+static void multi_threaded_web_client_worker_main_cleanup(void *ptr) {
+ struct web_client *w = ptr;
+ WEB_CLIENT_IS_DEAD(w);
+ w->running = 0;
+}
+
+static void *multi_threaded_web_client_worker_main(void *ptr) {
+ netdata_thread_cleanup_push(multi_threaded_web_client_worker_main_cleanup, ptr);
+
+ struct web_client *w = ptr;
+ w->running = 1;
+
+ struct pollfd fds[2], *ifd, *ofd;
+ int retval, timeout_ms;
+ nfds_t fdmax = 0;
+
+ while(!netdata_exit) {
+ if(unlikely(web_client_check_dead(w))) {
+ debug(D_WEB_CLIENT, "%llu: client is dead.", w->id);
+ break;
+ }
+ else if(unlikely(!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))) {
+ debug(D_WEB_CLIENT, "%llu: client is not set for neither receiving nor sending data.", w->id);
+ break;
+ }
+
+ if(unlikely(w->ifd < 0 || w->ofd < 0)) {
+ error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd", w->id, w->ifd, w->ofd);
+ break;
+ }
+
+ if(w->ifd == w->ofd) {
+ fds[0].fd = w->ifd;
+ fds[0].events = 0;
+ fds[0].revents = 0;
+
+ if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN;
+ if(web_client_has_wait_send(w)) fds[0].events |= POLLOUT;
+
+ fds[1].fd = -1;
+ fds[1].events = 0;
+ fds[1].revents = 0;
+
+ ifd = ofd = &fds[0];
+
+ fdmax = 1;
+ }
+ else {
+ fds[0].fd = w->ifd;
+ fds[0].events = 0;
+ fds[0].revents = 0;
+ if(web_client_has_wait_receive(w)) fds[0].events |= POLLIN;
+ ifd = &fds[0];
+
+ fds[1].fd = w->ofd;
+ fds[1].events = 0;
+ fds[1].revents = 0;
+ if(web_client_has_wait_send(w)) fds[1].events |= POLLOUT;
+ ofd = &fds[1];
+
+ fdmax = 2;
+ }
+
+ debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":"");
+ errno = 0;
+ timeout_ms = web_client_timeout * 1000;
+ retval = poll(fds, fdmax, timeout_ms);
+
+ if(unlikely(netdata_exit)) break;
+
+ if(unlikely(retval == -1)) {
+ if(errno == EAGAIN || errno == EINTR) {
+ debug(D_WEB_CLIENT, "%llu: EAGAIN received.", w->id);
+ continue;
+ }
+
+ debug(D_WEB_CLIENT, "%llu: LISTENER: poll() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
+ break;
+ }
+ else if(unlikely(!retval)) {
+ debug(D_WEB_CLIENT, "%llu: Timeout while waiting socket async I/O for %s %s", w->id, web_client_has_wait_receive(w)?"INPUT":"", web_client_has_wait_send(w)?"OUTPUT":"");
+ break;
+ }
+
+ if(unlikely(netdata_exit)) break;
+
+ int used = 0;
+ if(web_client_has_wait_send(w) && ofd->revents & POLLOUT) {
+ used++;
+ if(web_client_send(w) < 0) {
+ debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
+ break;
+ }
+ }
+
+ if(unlikely(netdata_exit)) break;
+
+ if(web_client_has_wait_receive(w) && (ifd->revents & POLLIN || ifd->revents & POLLPRI)) {
+ used++;
+ if(web_client_receive(w) < 0) {
+ debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
+ break;
+ }
+
+ if(w->mode == WEB_CLIENT_MODE_NORMAL) {
+ debug(D_WEB_CLIENT, "%llu: Attempting to process received data.", w->id);
+ web_client_process_request(w);
+
+ // if the sockets are closed, may have transferred this client
+ // to plugins.d
+ if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM))
+ break;
+ }
+ }
+
+ if(unlikely(!used)) {
+ debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on socket.", w->id);
+ break;
+ }
+ }
+
+ if(w->mode != WEB_CLIENT_MODE_STREAM)
+ log_connection(w, "DISCONNECTED");
+
+ web_client_request_done(w);
+
+ debug(D_WEB_CLIENT, "%llu: done...", w->id);
+
+ // close the sockets/files now
+ // to free file descriptors
+ if(w->ifd == w->ofd) {
+ if(w->ifd != -1) close(w->ifd);
+ }
+ else {
+ if(w->ifd != -1) close(w->ifd);
+ if(w->ofd != -1) close(w->ofd);
+ }
+ w->ifd = -1;
+ w->ofd = -1;
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
}
+// --------------------------------------------------------------------------------------
+// the main socket listener - MULTI-THREADED
+
// 1. it accepts new incoming requests on our port
// 2. creates a new web_client for each connection received
-// 3. spawns a new pthread to serve the client (this is optimal for keep-alive clients)
-// 4. cleans up old web_clients that their pthreads have been exited
+// 3. spawns a new netdata_thread to serve the client (this is optimal for keep-alive clients)
+// 4. cleans up old web_clients that their netdata_threads have been exited
+
+static void web_client_multi_threaded_web_server_release_clients(void) {
+ struct web_client *w;
+ for(w = web_clients_cache.used; w ; ) {
+ if(unlikely(!w->running && web_client_check_dead(w))) {
+ struct web_client *t = w->next;
+ web_client_release(w);
+ w = t;
+ }
+ else
+ w = w->next;
+ }
+}
+
+static void web_client_multi_threaded_web_server_stop_all_threads(void) {
+ struct web_client *w;
+
+ int found = 1, max = 2 * USEC_PER_SEC, step = 50000;
+ for(w = web_clients_cache.used; w ; w = w->next) {
+ if(w->running) {
+ found++;
+ info("stopping web client %s, id %llu", w->client_ip, w->id);
+ netdata_thread_cancel(w->thread);
+ }
+ }
+
+ while(found && max > 0) {
+ max -= step;
+ info("Waiting %d web threads to finish...", found);
+ sleep_usec(step);
+ found = 0;
+ for(w = web_clients_cache.used; w ; w = w->next)
+ if(w->running) found++;
+ }
+
+ if(found)
+ error("%d web threads are taking too long to finish. Giving up.", found);
+}
+
+static struct pollfd *socket_listen_main_multi_threaded_fds = NULL;
+
+static void socket_listen_main_multi_threaded_cleanup(void *data) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ info("cleaning up...");
-#define CLEANUP_EVERY_EVENTS 100
+ info("releasing allocated memory...");
+ freez(socket_listen_main_multi_threaded_fds);
+ info("closing all sockets...");
+ listen_sockets_close(&api_sockets);
+
+ info("stopping all running web server threads...");
+ web_client_multi_threaded_web_server_stop_all_threads();
+
+ info("freeing web clients cache...");
+ web_client_cache_destroy();
+
+ info("cleanup completed.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+#define CLEANUP_EVERY_EVENTS 60
void *socket_listen_main_multi_threaded(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ netdata_thread_cleanup_push(socket_listen_main_multi_threaded_cleanup, ptr);
web_server_mode = WEB_SERVER_MODE_MULTI_THREADED;
- info("Multi-threaded WEB SERVER thread created with task id %d", gettid());
+ web_server_is_multithreaded = 1;
struct web_client *w;
int retval, counter = 0;
- if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("Cannot set pthread cancel type to DEFERRED.");
-
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("Cannot set pthread cancel state to ENABLE.");
-
if(!api_sockets.opened)
fatal("LISTENER: No sockets to listen to.");
- struct pollfd *fds = callocz(sizeof(struct pollfd), api_sockets.opened);
+ socket_listen_main_multi_threaded_fds = callocz(sizeof(struct pollfd), api_sockets.opened);
size_t i;
for(i = 0; i < api_sockets.opened ;i++) {
- fds[i].fd = api_sockets.fds[i];
- fds[i].events = POLLIN;
- fds[i].revents = 0;
+ socket_listen_main_multi_threaded_fds[i].fd = api_sockets.fds[i];
+ socket_listen_main_multi_threaded_fds[i].events = POLLIN;
+ socket_listen_main_multi_threaded_fds[i].revents = 0;
info("Listening on '%s'", (api_sockets.fds_names[i])?api_sockets.fds_names[i]:"UNKNOWN");
}
- int timeout = 10 * 1000;
+ int timeout_ms = 1 * 1000;
+
+ while(!netdata_exit) {
- for(;;) {
// debug(D_WEB_CLIENT, "LISTENER: Waiting...");
- retval = poll(fds, api_sockets.opened, timeout);
+ retval = poll(socket_listen_main_multi_threaded_fds, api_sockets.opened, timeout_ms);
if(unlikely(retval == -1)) {
error("LISTENER: poll() failed.");
continue;
}
else if(unlikely(!retval)) {
- debug(D_WEB_CLIENT, "LISTENER: select() timeout.");
- counter = 0;
- cleanup_web_clients();
+ debug(D_WEB_CLIENT, "LISTENER: poll() timeout.");
+ counter++;
continue;
}
for(i = 0 ; i < api_sockets.opened ; i++) {
- short int revents = fds[i].revents;
+ short int revents = socket_listen_main_multi_threaded_fds[i].revents;
// check for new incoming connections
if(revents & POLLIN || revents & POLLPRI) {
- fds[i].revents = 0;
+ socket_listen_main_multi_threaded_fds[i].revents = 0;
- w = web_client_create(fds[i].fd);
+ w = web_client_create_on_listenfd(socket_listen_main_multi_threaded_fds[i].fd);
if(unlikely(!w)) {
- // no need for error log - web_client_create already logged the error
+ // no need for error log - web_client_create_on_listenfd already logged the error
continue;
}
@@ -173,40 +681,38 @@ void *socket_listen_main_multi_threaded(void *ptr) {
else
web_client_set_tcp(w);
- if(pthread_create(&w->thread, NULL, web_client_main, w) != 0) {
- error("%llu: failed to create new thread for web client.", w->id);
- WEB_CLIENT_IS_OBSOLETE(w);
- }
- else if(pthread_detach(w->thread) != 0) {
- error("%llu: Cannot request detach of newly created web client thread.", w->id);
- WEB_CLIENT_IS_OBSOLETE(w);
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "WEB_CLIENT[%llu,[%s]:%s]", w->id, w->client_ip, w->client_port);
+
+ w->running = 1;
+ if(netdata_thread_create(&w->thread, tag, NETDATA_THREAD_OPTION_DONT_LOG, multi_threaded_web_client_worker_main, w) != 0) {
+ w->running = 0;
+ web_client_release(w);
}
}
}
- // cleanup unused clients
counter++;
- if(counter >= CLEANUP_EVERY_EVENTS) {
+ if(counter > CLEANUP_EVERY_EVENTS) {
counter = 0;
- cleanup_web_clients();
+ web_client_multi_threaded_web_server_release_clients();
}
}
- debug(D_WEB_CLIENT, "LISTENER: exit!");
- listen_sockets_close(&api_sockets);
-
- freez(fds);
-
- static_thread->enabled = 0;
- pthread_exit(NULL);
+ netdata_thread_cleanup_pop(1);
return NULL;
}
+
+// --------------------------------------------------------------------------------------
+// the main socket listener - SINGLE-THREADED
+
struct web_client *single_threaded_clients[FD_SETSIZE];
static inline int single_threaded_link_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds, int *max) {
- if(unlikely(web_client_check_obsolete(w) || web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
+ if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) {
return 1;
+ }
if(unlikely(w->ifd < 0 || w->ifd >= (int)FD_SETSIZE || w->ofd < 0 || w->ofd >= (int)FD_SETSIZE)) {
error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, (int)FD_SETSIZE);
@@ -240,27 +746,33 @@ static inline int single_threaded_unlink_client(struct web_client *w, fd_set *if
single_threaded_clients[w->ifd] = NULL;
single_threaded_clients[w->ofd] = NULL;
- if(unlikely(web_client_check_obsolete(w) || web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
+ if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) {
return 1;
+ }
return 0;
}
-void *socket_listen_main_single_threaded(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+static void socket_listen_main_single_threaded_cleanup(void *data) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED;
+ info("closing all sockets...");
+ listen_sockets_close(&api_sockets);
- info("Single-threaded WEB SERVER thread created with task id %d", gettid());
+ info("freeing web clients cache...");
+ web_client_cache_destroy();
- struct web_client *w;
- int retval;
+ info("cleanup completed.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
- if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("Cannot set pthread cancel type to DEFERRED.");
+void *socket_listen_main_single_threaded(void *ptr) {
+ netdata_thread_cleanup_push(socket_listen_main_single_threaded_cleanup, ptr);
+ web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED;
+ web_server_is_multithreaded = 0;
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("Cannot set pthread cancel state to ENABLE.");
+ struct web_client *w;
if(!api_sockets.opened)
fatal("LISTENER: no listen sockets available.");
@@ -287,14 +799,14 @@ void *socket_listen_main_single_threaded(void *ptr) {
fdmax = api_sockets.fds[i];
}
- for(;;) {
+ while(!netdata_exit) {
debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server waiting (fdmax = %d)...", fdmax);
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
rifds = ifds;
rofds = ofds;
refds = efds;
- retval = select(fdmax+1, &rifds, &rofds, &refds, &tv);
+ int retval = select(fdmax+1, &rifds, &rofds, &refds, &tv);
if(unlikely(retval == -1)) {
error("LISTENER: select() failed.");
@@ -306,7 +818,9 @@ void *socket_listen_main_single_threaded(void *ptr) {
for(i = 0; i < api_sockets.opened ; i++) {
if (FD_ISSET(api_sockets.fds[i], &rifds)) {
debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
- w = web_client_create(api_sockets.fds[i]);
+ w = web_client_create_on_listenfd(api_sockets.fds[i]);
+ if(unlikely(!w))
+ continue;
if(api_sockets.fds_families[i] == AF_UNIX)
web_client_set_unix(w);
@@ -314,7 +828,7 @@ void *socket_listen_main_single_threaded(void *ptr) {
web_client_set_tcp(w);
if (single_threaded_link_client(w, &ifds, &ofds, &ifds, &fdmax) != 0) {
- web_client_free(w);
+ web_client_release(w);
}
}
}
@@ -324,22 +838,27 @@ void *socket_listen_main_single_threaded(void *ptr) {
continue;
w = single_threaded_clients[i];
- if(unlikely(!w))
+ if(unlikely(!w)) {
+ // error("no client on slot %zu", i);
continue;
+ }
if(unlikely(single_threaded_unlink_client(w, &ifds, &ofds, &efds) != 0)) {
- web_client_free(w);
+ // error("failed to unlink client %zu", i);
+ web_client_release(w);
continue;
}
if (unlikely(FD_ISSET(w->ifd, &refds) || FD_ISSET(w->ofd, &refds))) {
- web_client_free(w);
+ // error("no input on client %zu", i);
+ web_client_release(w);
continue;
}
if (unlikely(web_client_has_wait_receive(w) && FD_ISSET(w->ifd, &rifds))) {
if (unlikely(web_client_receive(w) < 0)) {
- web_client_free(w);
+ // error("cannot read from client %zu", i);
+ web_client_release(w);
continue;
}
@@ -351,122 +870,243 @@ void *socket_listen_main_single_threaded(void *ptr) {
if (unlikely(web_client_has_wait_send(w) && FD_ISSET(w->ofd, &rofds))) {
if (unlikely(web_client_send(w) < 0)) {
+ // error("cannot send data to client %zu", i);
debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
- web_client_free(w);
+ web_client_release(w);
continue;
}
}
if(unlikely(single_threaded_link_client(w, &ifds, &ofds, &efds, &fdmax) != 0)) {
- web_client_free(w);
+ // error("failed to link client %zu", i);
+ web_client_release(w);
}
}
}
else {
debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server timeout.");
-#ifdef NETDATA_INTERNAL_CHECKS
- log_allocations();
-#endif
}
}
- debug(D_WEB_CLIENT, "LISTENER: exit!");
- listen_sockets_close(&api_sockets);
-
- static_thread->enabled = 0;
- pthread_exit(NULL);
+ netdata_thread_cleanup_pop(1);
return NULL;
}
-#if 0
-// new TCP client connected
-static void *web_server_add_callback(int fd, int socktype, short int *events) {
- (void)fd;
- (void)socktype;
+// --------------------------------------------------------------------------------------
+// the main socket listener - STATIC-THREADED
+
+struct web_server_static_threaded_worker {
+ netdata_thread_t thread;
+
+ int id;
+ int running;
+
+ size_t max_sockets;
+
+ volatile size_t connected;
+ volatile size_t disconnected;
+ volatile size_t receptions;
+ volatile size_t sends;
+ volatile size_t max_concurrent;
+
+ volatile size_t files_read;
+ volatile size_t file_reads;
+};
+
+static long long static_threaded_workers_count = 1;
+static struct web_server_static_threaded_worker *static_workers_private_data = NULL;
+static __thread struct web_server_static_threaded_worker *worker_private = NULL;
+
+// ----------------------------------------------------------------------------
+
+static inline int web_server_check_client_status(struct web_client *w) {
+ if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
+ return -1;
+
+ return 0;
+}
+
+// ----------------------------------------------------------------------------
+// web server files
+
+static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) {
+ struct web_client *w = (struct web_client *)data;
+
+ worker_private->files_read++;
+ debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd);
*events = POLLIN;
+ pi->data = w;
+ return w;
+}
+
+static void web_werver_file_del_callback(POLLINFO *pi) {
+ struct web_client *w = (struct web_client *)pi->data;
+ debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd);
+
+ w->pollinfo_filecopy_slot = 0;
+
+ if(unlikely(!w->pollinfo_slot)) {
+ debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
+ web_client_release(w);
+ }
+}
+
+static int web_server_file_read_callback(POLLINFO *pi, short int *events) {
+ struct web_client *w = (struct web_client *)pi->data;
+
+ // if there is no POLLINFO linked to this, it means the client disconnected
+ // stop the file reading too
+ if(unlikely(!w->pollinfo_slot)) {
+ debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd);
+ return -1;
+ }
+
+ if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) {
+ debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd);
+ return -1;
+ }
+
+ debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd);
- debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", fd);
- struct web_client *w = web_client_create(fd);
+ worker_private->file_reads++;
+ ssize_t ret = unlikely(web_client_read_file(w));
- if(unlikely(socktype == AF_UNIX))
+ if(likely(web_client_has_wait_send(w))) {
+ POLLJOB *p = pi->p; // our POLLJOB
+ POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket
+
+ debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd);
+ p->fds[wpi->slot].events |= POLLOUT;
+ }
+
+ if(unlikely(ret <= 0 || w->ifd == w->ofd)) {
+ debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd);
+ return -1;
+ }
+
+ *events = POLLIN;
+ return 0;
+}
+
+static int web_server_file_write_callback(POLLINFO *pi, short int *events) {
+ (void)pi;
+ (void)events;
+
+ error("Writing to web files is not supported!");
+
+ return -1;
+}
+
+// ----------------------------------------------------------------------------
+// web server clients
+
+static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) {
+ (void)data;
+
+ worker_private->connected++;
+
+ size_t concurrent = worker_private->connected - worker_private->disconnected;
+ if(unlikely(concurrent > worker_private->max_concurrent))
+ worker_private->max_concurrent = concurrent;
+
+ *events = POLLIN;
+
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd);
+ struct web_client *w = web_client_create_on_fd(pi->fd, pi->client_ip, pi->client_port);
+ w->pollinfo_slot = pi->slot;
+
+ if(unlikely(pi->socktype == AF_UNIX))
web_client_set_unix(w);
else
web_client_set_tcp(w);
- return (void *)w;
+ debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd);
+ return w;
}
// TCP client disconnected
-static void web_server_del_callback(int fd, int socktype, void *data) {
- (void)fd;
- (void)socktype;
+static void web_server_del_callback(POLLINFO *pi) {
+ worker_private->disconnected++;
- struct web_client *w = (struct web_client *)data;
+ struct web_client *w = (struct web_client *)pi->data;
- if(w) {
- if(w->ofd == -1 || fd == w->ofd) {
- // we free the client, only if the closing fd
- // is the client socket
- web_client_free(w);
- }
+ w->pollinfo_slot = 0;
+ if(unlikely(w->pollinfo_filecopy_slot)) {
+ POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket
+ debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd);
}
+ else {
+ if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET))
+ pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
- return;
+ debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
+ web_client_release(w);
+ }
}
-// Receive data
-static int web_server_rcv_callback(int fd, int socktype, void *data, short int *events) {
- (void)fd;
- (void)socktype;
-
- *events = 0;
-
- struct web_client *w = (struct web_client *)data;
+static int web_server_rcv_callback(POLLINFO *pi, short int *events) {
+ worker_private->receptions++;
- if(unlikely(!web_client_has_wait_receive(w)))
- return -1;
+ struct web_client *w = (struct web_client *)pi->data;
+ int fd = pi->fd;
if(unlikely(web_client_receive(w) < 0))
return -1;
+ debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd);
+ web_client_process_request(w);
+
if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
- if(unlikely(w->ifd != -1 && w->ifd != fd)) {
- // FIXME: we switched input fd
- // add a new socket to poll_events, with the same
- }
- else if(unlikely(w->ifd == -1)) {
- // FIXME: we closed input fd
- // instruct poll_events() to close fd
- return -1;
+ if(w->pollinfo_filecopy_slot == 0) {
+ debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd);
+
+ if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) {
+ // add a new socket to poll_events, with the same
+ debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd);
+
+ POLLINFO *fpi = poll_add_fd(
+ pi->p
+ , w->ifd
+ , 0
+ , POLLINFO_FLAG_CLIENT_SOCKET
+ , "FILENAME"
+ , ""
+ , web_server_file_add_callback
+ , web_werver_file_del_callback
+ , web_server_file_read_callback
+ , web_server_file_write_callback
+ , (void *) w
+ );
+
+ if(fpi)
+ w->pollinfo_filecopy_slot = fpi->slot;
+ else {
+ error("Failed to add filecopy fd. Closing client.");
+ return -1;
+ }
+ }
}
}
else {
- debug(D_WEB_CLIENT, "%llu: Processing received data.", w->id);
- web_client_process_request(w);
+ if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
+ *events |= POLLIN;
}
- if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
- *events |= POLLIN;
-
if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
*events |= POLLOUT;
- if(unlikely(*events == 0))
- return -1;
-
- return 0;
+ return web_server_check_client_status(w);
}
-static int web_server_snd_callback(int fd, int socktype, void *data, short int *events) {
- (void)fd;
- (void)socktype;
+static int web_server_snd_callback(POLLINFO *pi, short int *events) {
+ worker_private->sends++;
- struct web_client *w = (struct web_client *)data;
+ struct web_client *w = (struct web_client *)pi->data;
+ int fd = pi->fd;
- if(unlikely(!web_client_has_wait_send(w)))
- return -1;
+ debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
if(unlikely(web_client_send(w) < 0))
return -1;
@@ -477,42 +1117,176 @@ static int web_server_snd_callback(int fd, int socktype, void *data, short int *
if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
*events |= POLLOUT;
- if(unlikely(*events == 0))
- return -1;
-
- return 0;
+ return web_server_check_client_status(w);
}
-void *socket_listen_main_single_threaded(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+static void web_server_tmr_callback(void *timer_data) {
+ worker_private = (struct web_server_static_threaded_worker *)timer_data;
- web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED;
+ static __thread RRDSET *st = NULL;
+ static __thread RRDDIM *rd_user = NULL, *rd_system = NULL;
- info("Single-threaded WEB SERVER thread created with task id %d", gettid());
+ if(unlikely(!st)) {
+ char id[100 + 1];
+ char title[100 + 1];
- if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("Cannot set pthread cancel type to DEFERRED.");
+ snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1);
+ snprintfz(title, 100, "NetData web server thread No %d CPU usage", worker_private->id + 1);
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("Cannot set pthread cancel state to ENABLE.");
+ st = rrdset_create_localhost(
+ "netdata"
+ , id
+ , NULL
+ , "web"
+ , "netdata.web_cpu"
+ , title
+ , "milliseconds/s"
+ , "web"
+ , "stats"
+ , 132000 + worker_private->id
+ , default_rrd_update_every
+ , RRDSET_TYPE_STACKED
+ );
+
+ rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ }
+ else
+ rrdset_next(st);
- if(!api_sockets.opened)
- fatal("LISTENER: no listen sockets available.");
+ struct rusage rusage;
+ getrusage(RUSAGE_THREAD, &rusage);
+ rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec);
+ rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec);
+ rrdset_done(st);
+}
- poll_events(&api_sockets
- , web_server_add_callback
- , web_server_del_callback
- , web_server_rcv_callback
- , web_server_snd_callback
- , web_allow_connections_from
- , NULL
+// ----------------------------------------------------------------------------
+// web server worker thread
+
+static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
+ worker_private = (struct web_server_static_threaded_worker *)ptr;
+
+ info("freeing local web clients cache...");
+ web_client_cache_destroy();
+
+ info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
+ worker_private->connected,
+ worker_private->disconnected,
+ worker_private->max_concurrent,
+ worker_private->receptions,
+ worker_private->sends
);
- debug(D_WEB_CLIENT, "LISTENER: exit!");
+ worker_private->running = 0;
+}
+
+void *socket_listen_main_static_threaded_worker(void *ptr) {
+ worker_private = (struct web_server_static_threaded_worker *)ptr;
+ worker_private->running = 1;
+
+ netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr);
+
+ poll_events(&api_sockets
+ , web_server_add_callback
+ , web_server_del_callback
+ , web_server_rcv_callback
+ , web_server_snd_callback
+ , web_server_tmr_callback
+ , web_allow_connections_from
+ , NULL
+ , web_client_first_request_timeout
+ , web_client_timeout
+ , default_rrd_update_every * 1000 // timer_milliseconds
+ , ptr // timer_data
+ , worker_private->max_sockets
+ );
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+
+// ----------------------------------------------------------------------------
+// web server main thread - also becomes a worker
+
+static void socket_listen_main_static_threaded_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ int i, found = 0, max = 2 * USEC_PER_SEC, step = 50000;
+
+ // we start from 1, - 0 is self
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ if(static_workers_private_data[i].running) {
+ found++;
+ info("stopping worker %d", i + 1);
+ netdata_thread_cancel(static_workers_private_data[i].thread);
+ }
+ else
+ info("found stopped worker %d", i + 1);
+ }
+
+ while(found && max > 0) {
+ max -= step;
+ info("Waiting %d static web threads to finish...", found);
+ sleep_usec(step);
+ found = 0;
+
+ // we start from 1, - 0 is self
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ if (static_workers_private_data[i].running)
+ found++;
+ }
+ }
+
+ if(found)
+ error("%d static web threads are taking too long to finish. Giving up.", found);
+
+ info("closing all web server sockets...");
listen_sockets_close(&api_sockets);
- static_thread->enabled = 0;
- pthread_exit(NULL);
+ info("all static web threads stopped.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+void *socket_listen_main_static_threaded(void *ptr) {
+ netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr);
+ web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
+
+ if(!api_sockets.opened)
+ fatal("LISTENER: no listen sockets available.");
+
+ // 6 threads is the optimal value
+ // since 6 are the parallel connections browsers will do
+ // so, if the machine has more CPUs, avoid using resources unnecessarily
+ int def_thread_count = (processors > 6)?6:processors;
+
+ static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count);
+ if(static_threaded_workers_count < 1) static_threaded_workers_count = 1;
+
+ size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2));
+
+ static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker));
+
+ web_server_is_multithreaded = (static_threaded_workers_count > 1);
+
+ int i;
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ static_workers_private_data[i].id = i;
+ static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count;
+
+ char tag[50 + 1];
+ snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1);
+
+ info("starting worker %d", i+1);
+ netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]);
+ }
+
+ // and the main one
+ static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count;
+ socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]);
+
+ netdata_thread_cleanup_pop(1);
return NULL;
}
-#endif