summaryrefslogtreecommitdiffstats
path: root/web/server/static/static-threaded.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:19:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2018-11-07 12:20:17 +0000
commita64a253794ac64cb40befee54db53bde17dd0d49 (patch)
treec1024acc5f6e508814b944d99f112259bb28b1be /web/server/static/static-threaded.c
parentNew upstream version 1.10.0+dfsg (diff)
downloadnetdata-a64a253794ac64cb40befee54db53bde17dd0d49.tar.xz
netdata-a64a253794ac64cb40befee54db53bde17dd0d49.zip
New upstream version 1.11.0+dfsgupstream/1.11.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'web/server/static/static-threaded.c')
-rw-r--r--web/server/static/static-threaded.c424
1 files changed, 424 insertions, 0 deletions
diff --git a/web/server/static/static-threaded.c b/web/server/static/static-threaded.c
new file mode 100644
index 000000000..56b8dbf8d
--- /dev/null
+++ b/web/server/static/static-threaded.c
@@ -0,0 +1,424 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define WEB_SERVER_INTERNALS 1
+#include "static-threaded.h"
+
+// ----------------------------------------------------------------------------
+// high level web clients connection management
+
+static struct web_client *web_client_create_on_fd(int fd, const char *client_ip, const char *client_port) {
+ struct web_client *w;
+
+ w = web_client_get_from_cache_or_allocate();
+ w->ifd = w->ofd = fd;
+
+ strncpyz(w->client_ip, client_ip, sizeof(w->client_ip) - 1);
+ strncpyz(w->client_port, client_port, sizeof(w->client_port) - 1);
+
+ if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
+ if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
+
+ web_client_initialize_connection(w);
+ return(w);
+}
+
+// --------------------------------------------------------------------------------------
+// the main socket listener - STATIC-THREADED
+
+struct web_server_static_threaded_worker {
+ netdata_thread_t thread;
+
+ int id;
+ int running;
+
+ size_t max_sockets;
+
+ volatile size_t connected;
+ volatile size_t disconnected;
+ volatile size_t receptions;
+ volatile size_t sends;
+ volatile size_t max_concurrent;
+
+ volatile size_t files_read;
+ volatile size_t file_reads;
+};
+
+static long long static_threaded_workers_count = 1;
+static struct web_server_static_threaded_worker *static_workers_private_data = NULL;
+static __thread struct web_server_static_threaded_worker *worker_private = NULL;
+
+// ----------------------------------------------------------------------------
+
+static inline int web_server_check_client_status(struct web_client *w) {
+ if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
+ return -1;
+
+ return 0;
+}
+
+// ----------------------------------------------------------------------------
+// web server files
+
+static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) {
+ struct web_client *w = (struct web_client *)data;
+
+ worker_private->files_read++;
+
+ debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd);
+ *events = POLLIN;
+ pi->data = w;
+ return w;
+}
+
+static void web_werver_file_del_callback(POLLINFO *pi) {
+ struct web_client *w = (struct web_client *)pi->data;
+ debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd);
+
+ w->pollinfo_filecopy_slot = 0;
+
+ if(unlikely(!w->pollinfo_slot)) {
+ debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
+ web_client_release(w);
+ }
+}
+
+static int web_server_file_read_callback(POLLINFO *pi, short int *events) {
+ struct web_client *w = (struct web_client *)pi->data;
+
+ // if there is no POLLINFO linked to this, it means the client disconnected
+ // stop the file reading too
+ if(unlikely(!w->pollinfo_slot)) {
+ debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd);
+ return -1;
+ }
+
+ if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) {
+ debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd);
+ return -1;
+ }
+
+ debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd);
+
+ worker_private->file_reads++;
+ ssize_t ret = unlikely(web_client_read_file(w));
+
+ if(likely(web_client_has_wait_send(w))) {
+ POLLJOB *p = pi->p; // our POLLJOB
+ POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket
+
+ debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd);
+ p->fds[wpi->slot].events |= POLLOUT;
+ }
+
+ if(unlikely(ret <= 0 || w->ifd == w->ofd)) {
+ debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd);
+ return -1;
+ }
+
+ *events = POLLIN;
+ return 0;
+}
+
+static int web_server_file_write_callback(POLLINFO *pi, short int *events) {
+ (void)pi;
+ (void)events;
+
+ error("Writing to web files is not supported!");
+
+ return -1;
+}
+
+// ----------------------------------------------------------------------------
+// web server clients
+
+static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) {
+ (void)data;
+
+ worker_private->connected++;
+
+ size_t concurrent = worker_private->connected - worker_private->disconnected;
+ if(unlikely(concurrent > worker_private->max_concurrent))
+ worker_private->max_concurrent = concurrent;
+
+ *events = POLLIN;
+
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd);
+ struct web_client *w = web_client_create_on_fd(pi->fd, pi->client_ip, pi->client_port);
+ w->pollinfo_slot = pi->slot;
+
+ if(unlikely(pi->socktype == AF_UNIX))
+ web_client_set_unix(w);
+ else
+ web_client_set_tcp(w);
+
+ debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd);
+ return w;
+}
+
+// TCP client disconnected
+static void web_server_del_callback(POLLINFO *pi) {
+ worker_private->disconnected++;
+
+ struct web_client *w = (struct web_client *)pi->data;
+
+ w->pollinfo_slot = 0;
+ if(unlikely(w->pollinfo_filecopy_slot)) {
+ POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket
+ (void)fpi;
+
+ debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd);
+ }
+ else {
+ if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET))
+ pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
+
+ debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
+ web_client_release(w);
+ }
+}
+
+static int web_server_rcv_callback(POLLINFO *pi, short int *events) {
+ worker_private->receptions++;
+
+ struct web_client *w = (struct web_client *)pi->data;
+ int fd = pi->fd;
+
+ if(unlikely(web_client_receive(w) < 0))
+ return -1;
+
+ debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd);
+ web_client_process_request(w);
+
+ if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
+ if(w->pollinfo_filecopy_slot == 0) {
+ debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd);
+
+ if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) {
+ // add a new socket to poll_events, with the same
+ debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd);
+
+ POLLINFO *fpi = poll_add_fd(
+ pi->p
+ , w->ifd
+ , 0
+ , POLLINFO_FLAG_CLIENT_SOCKET
+ , "FILENAME"
+ , ""
+ , web_server_file_add_callback
+ , web_werver_file_del_callback
+ , web_server_file_read_callback
+ , web_server_file_write_callback
+ , (void *) w
+ );
+
+ if(fpi)
+ w->pollinfo_filecopy_slot = fpi->slot;
+ else {
+ error("Failed to add filecopy fd. Closing client.");
+ return -1;
+ }
+ }
+ }
+ }
+ else {
+ if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
+ *events |= POLLIN;
+ }
+
+ if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
+ *events |= POLLOUT;
+
+ return web_server_check_client_status(w);
+}
+
+static int web_server_snd_callback(POLLINFO *pi, short int *events) {
+ worker_private->sends++;
+
+ struct web_client *w = (struct web_client *)pi->data;
+ int fd = pi->fd;
+
+ debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
+
+ if(unlikely(web_client_send(w) < 0))
+ return -1;
+
+ if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
+ *events |= POLLIN;
+
+ if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
+ *events |= POLLOUT;
+
+ return web_server_check_client_status(w);
+}
+
+static void web_server_tmr_callback(void *timer_data) {
+ worker_private = (struct web_server_static_threaded_worker *)timer_data;
+
+ static __thread RRDSET *st = NULL;
+ static __thread RRDDIM *rd_user = NULL, *rd_system = NULL;
+
+ if(unlikely(!st)) {
+ char id[100 + 1];
+ char title[100 + 1];
+
+ snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1);
+ snprintfz(title, 100, "NetData web server thread No %d CPU usage", worker_private->id + 1);
+
+ st = rrdset_create_localhost(
+ "netdata"
+ , id
+ , NULL
+ , "web"
+ , "netdata.web_cpu"
+ , title
+ , "milliseconds/s"
+ , "web"
+ , "stats"
+ , 132000 + worker_private->id
+ , default_rrd_update_every
+ , RRDSET_TYPE_STACKED
+ );
+
+ rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ }
+ else
+ rrdset_next(st);
+
+ struct rusage rusage;
+ getrusage(RUSAGE_THREAD, &rusage);
+ rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec);
+ rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec);
+ rrdset_done(st);
+}
+
+// ----------------------------------------------------------------------------
+// web server worker thread
+
+static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
+ worker_private = (struct web_server_static_threaded_worker *)ptr;
+
+ info("freeing local web clients cache...");
+ web_client_cache_destroy();
+
+ info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
+ worker_private->connected,
+ worker_private->disconnected,
+ worker_private->max_concurrent,
+ worker_private->receptions,
+ worker_private->sends
+ );
+
+ worker_private->running = 0;
+}
+
+void *socket_listen_main_static_threaded_worker(void *ptr) {
+ worker_private = (struct web_server_static_threaded_worker *)ptr;
+ worker_private->running = 1;
+
+ netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr);
+
+ poll_events(&api_sockets
+ , web_server_add_callback
+ , web_server_del_callback
+ , web_server_rcv_callback
+ , web_server_snd_callback
+ , web_server_tmr_callback
+ , web_allow_connections_from
+ , NULL
+ , web_client_first_request_timeout
+ , web_client_timeout
+ , default_rrd_update_every * 1000 // timer_milliseconds
+ , ptr // timer_data
+ , worker_private->max_sockets
+ );
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+
+// ----------------------------------------------------------------------------
+// web server main thread - also becomes a worker
+
+static void socket_listen_main_static_threaded_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ int i, found = 0;
+ usec_t max = 2 * USEC_PER_SEC, step = 50000;
+
+ // we start from 1, - 0 is self
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ if(static_workers_private_data[i].running) {
+ found++;
+ info("stopping worker %d", i + 1);
+ netdata_thread_cancel(static_workers_private_data[i].thread);
+ }
+ else
+ info("found stopped worker %d", i + 1);
+ }
+
+ while(found && max > 0) {
+ max -= step;
+ info("Waiting %d static web threads to finish...", found);
+ sleep_usec(step);
+ found = 0;
+
+ // we start from 1, - 0 is self
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ if (static_workers_private_data[i].running)
+ found++;
+ }
+ }
+
+ if(found)
+ error("%d static web threads are taking too long to finish. Giving up.", found);
+
+ info("closing all web server sockets...");
+ listen_sockets_close(&api_sockets);
+
+ info("all static web threads stopped.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+void *socket_listen_main_static_threaded(void *ptr) {
+ netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr);
+ web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
+
+ if(!api_sockets.opened)
+ fatal("LISTENER: no listen sockets available.");
+
+ // 6 threads is the optimal value
+ // since 6 are the parallel connections browsers will do
+ // so, if the machine has more CPUs, avoid using resources unnecessarily
+ int def_thread_count = (processors > 6)?6:processors;
+
+ static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count);
+ if(static_threaded_workers_count < 1) static_threaded_workers_count = 1;
+
+ size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2));
+
+ static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker));
+
+ web_server_is_multithreaded = (static_threaded_workers_count > 1);
+
+ int i;
+ for(i = 1; i < static_threaded_workers_count; i++) {
+ static_workers_private_data[i].id = i;
+ static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count;
+
+ char tag[50 + 1];
+ snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1);
+
+ info("starting worker %d", i+1);
+ netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]);
+ }
+
+ // and the main one
+ static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count;
+ socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]);
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}