// SPDX-License-Identifier: GPL-3.0-or-later #define WEB_SERVER_INTERNALS 1 #include "static-threaded.h" int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS; int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST; long web_client_streaming_rate_t = 0L; #define WORKER_JOB_ADD_CONNECTION 0 #define WORKER_JOB_DEL_COLLECTION 1 #define WORKER_JOB_ADD_FILE 2 #define WORKER_JOB_DEL_FILE 3 #define WORKER_JOB_READ_FILE 4 #define WORKER_JOB_WRITE_FILE 5 #define WORKER_JOB_RCV_DATA 6 #define WORKER_JOB_SND_DATA 7 #define WORKER_JOB_PROCESS 8 #if (WORKER_UTILIZATION_MAX_JOB_TYPES < 9) #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least 8 #endif /* * -------------------------------------------------------------------------------------------------------------------- * Build web_client state from the pollinfo that describes an accepted connection. */ static struct web_client *web_client_create_on_fd(POLLINFO *pi) { struct web_client *w; w = web_client_get_from_cache(); w->ifd = w->ofd = pi->fd; strncpyz(w->client_ip, pi->client_ip, sizeof(w->client_ip) - 1); strncpyz(w->client_port, pi->client_port, sizeof(w->client_port) - 1); strncpyz(w->client_host, pi->client_host, sizeof(w->client_host) - 1); if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-"); if(unlikely(!*w->client_port)) strcpy(w->client_port, "-"); w->port_acl = pi->port_acl; int flag = 1; if(unlikely( web_client_check_conn_tcp(w) && setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0)) netdata_log_debug(D_WEB_CLIENT, "%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd); flag = 1; if(unlikely(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0)) netdata_log_debug(D_WEB_CLIENT, "%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd); web_client_update_acl_matches(w); web_client_enable_wait_receive(w); web_server_log_connection(w, "CONNECTED"); w->pollinfo_slot = pi->slot; return(w); } // -------------------------------------------------------------------------------------- // the main socket listener - STATIC-THREADED struct web_server_static_threaded_worker { ND_THREAD *thread; int id; int running; size_t max_sockets; volatile size_t connected; volatile size_t disconnected; volatile size_t receptions; volatile size_t sends; volatile size_t max_concurrent; volatile size_t files_read; volatile size_t file_reads; }; static long long static_threaded_workers_count = 1; static struct web_server_static_threaded_worker *static_workers_private_data = NULL; static __thread struct web_server_static_threaded_worker *worker_private = NULL; // ---------------------------------------------------------------------------- static inline int web_server_check_client_status(struct web_client *w) { if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w)))) return -1; return 0; } // ---------------------------------------------------------------------------- // web server files static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) { struct web_client *w = (struct web_client *)data; worker_is_busy(WORKER_JOB_ADD_FILE); worker_private->files_read++; netdata_log_debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd); *events = POLLIN; pi->data = w; worker_is_idle(); return w; } static void web_server_file_del_callback(POLLINFO *pi) { struct web_client *w = (struct web_client *)pi->data; netdata_log_debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd); worker_is_busy(WORKER_JOB_DEL_FILE); w->pollinfo_filecopy_slot = 0; if(unlikely(!w->pollinfo_slot)) { netdata_log_debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd); web_server_log_connection(w, "DISCONNECTED"); web_client_request_done(w); web_client_release_to_cache(w); global_statistics_web_client_disconnected(); } worker_is_idle(); } static int web_server_file_read_callback(POLLINFO *pi, short int *events) { int retval = -1; struct web_client *w = (struct web_client *)pi->data; worker_is_busy(WORKER_JOB_READ_FILE); // if there is no POLLINFO linked to this, it means the client disconnected // stop the file reading too if(unlikely(!w->pollinfo_slot)) { netdata_log_debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd); retval = -1; goto cleanup; } if(unlikely(w->mode != HTTP_REQUEST_MODE_FILECOPY || w->ifd == w->ofd)) { netdata_log_debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd); retval = -1; goto cleanup; } netdata_log_debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd); worker_private->file_reads++; ssize_t ret = unlikely(web_client_read_file(w)); if(likely(web_client_has_wait_send(w))) { POLLJOB *p = pi->p; // our POLLJOB POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket netdata_log_debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd); p->fds[wpi->slot].events |= POLLOUT; } if(unlikely(ret <= 0 || w->ifd == w->ofd)) { netdata_log_debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd); retval = -1; goto cleanup; } *events = POLLIN; retval = 0; cleanup: worker_is_idle(); return retval; } static int web_server_file_write_callback(POLLINFO *pi, short int *events) { (void)pi; (void)events; worker_is_busy(WORKER_JOB_WRITE_FILE); netdata_log_error("Writing to web files is not supported!"); worker_is_idle(); return -1; } // ---------------------------------------------------------------------------- // web server clients static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) { (void)data; // Suppress warning on unused argument worker_is_busy(WORKER_JOB_ADD_CONNECTION); worker_private->connected++; size_t concurrent = worker_private->connected - worker_private->disconnected; if(unlikely(concurrent > worker_private->max_concurrent)) worker_private->max_concurrent = concurrent; *events = POLLIN; netdata_log_debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd); struct web_client *w = web_client_create_on_fd(pi); if (!strncmp(pi->client_port, "UNIX", 4)) { web_client_set_conn_unix(w); } else { web_client_set_conn_tcp(w); } if ((web_client_check_conn_tcp(w)) && (netdata_ssl_web_server_ctx)) { sock_delnonblock(w->ifd); //Read the first 7 bytes from the message, but the message //is not removed from the queue, because we are using MSG_PEEK char test[8]; if ( recv(w->ifd,test, 7, MSG_PEEK) == 7 ) { test[7] = '\0'; } else { // we couldn't read 7 bytes sock_setnonblock(w->ifd); goto cleanup; } if(test[0] > 0x17) { // no SSL netdata_ssl_close(&w->ssl); // free any previous SSL data } else { // SSL if(!netdata_ssl_open(&w->ssl, netdata_ssl_web_server_ctx, w->ifd) || !netdata_ssl_accept(&w->ssl)) WEB_CLIENT_IS_DEAD(w); } sock_setnonblock(w->ifd); } netdata_log_debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd); cleanup: worker_is_idle(); return w; } // TCP client disconnected static void web_server_del_callback(POLLINFO *pi) { worker_is_busy(WORKER_JOB_DEL_COLLECTION); worker_private->disconnected++; struct web_client *w = (struct web_client *)pi->data; w->pollinfo_slot = 0; if(unlikely(w->pollinfo_filecopy_slot)) { POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket (void)fpi; netdata_log_debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd); } else { if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET)) pi->flags |= POLLINFO_FLAG_DONT_CLOSE; netdata_log_debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd); web_server_log_connection(w, "DISCONNECTED"); web_client_request_done(w); web_client_release_to_cache(w); global_statistics_web_client_disconnected(); } worker_is_idle(); } static int web_server_rcv_callback(POLLINFO *pi, short int *events) { int ret = -1; worker_is_busy(WORKER_JOB_RCV_DATA); worker_private->receptions++; struct web_client *w = (struct web_client *)pi->data; int fd = pi->fd; ssize_t bytes; bytes = web_client_receive(w); if (likely(bytes > 0)) { netdata_log_debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd); worker_is_idle(); worker_is_busy(WORKER_JOB_PROCESS); web_client_process_request_from_web_server(w); if (unlikely(w->mode == HTTP_REQUEST_MODE_STREAM)) { web_client_send(w); } else if(unlikely(w->mode == HTTP_REQUEST_MODE_FILECOPY)) { if(w->pollinfo_filecopy_slot == 0) { netdata_log_debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd); if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) { // add a new socket to poll_events, with the same netdata_log_debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd); POLLINFO *fpi = poll_add_fd( pi->p , w->ifd , pi->port_acl , 0 , POLLINFO_FLAG_CLIENT_SOCKET , "FILENAME" , "" , "" , web_server_file_add_callback , web_server_file_del_callback , web_server_file_read_callback , web_server_file_write_callback , (void *) w ); if(fpi) w->pollinfo_filecopy_slot = fpi->slot; else { netdata_log_error("Failed to add filecopy fd. Closing client."); ret = -1; goto cleanup; } } } } else { if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) *events |= POLLIN; } if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) *events |= POLLOUT; } else if(unlikely(bytes < 0)) { ret = -1; goto cleanup; } else if (unlikely(bytes == 0)) { if(unlikely(w->ifd == fd && web_client_has_ssl_wait_receive(w))) *events |= POLLIN; if(unlikely(w->ofd == fd && web_client_has_ssl_wait_send(w))) *events |= POLLOUT; } ret = web_server_check_client_status(w); cleanup: worker_is_idle(); return ret; } static int web_server_snd_callback(POLLINFO *pi, short int *events) { int retval = -1; worker_is_busy(WORKER_JOB_SND_DATA); worker_private->sends++; struct web_client *w = (struct web_client *)pi->data; int fd = pi->fd; netdata_log_debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd); ssize_t ret = web_client_send(w); if(unlikely(ret < 0)) { retval = -1; goto cleanup; } if(unlikely(w->ifd == fd && web_client_has_wait_receive(w))) *events |= POLLIN; if(unlikely(w->ofd == fd && web_client_has_wait_send(w))) *events |= POLLOUT; retval = web_server_check_client_status(w); cleanup: worker_is_idle(); return retval; } // ---------------------------------------------------------------------------- // web server worker thread static void socket_listen_main_static_threaded_worker_cleanup(void *pptr) { worker_private = CLEANUP_FUNCTION_GET_PTR(pptr); if(!worker_private) return; netdata_log_info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends", worker_private->connected, worker_private->disconnected, worker_private->max_concurrent, worker_private->receptions, worker_private->sends ); worker_private->running = 0; worker_unregister(); } static bool web_server_should_stop(void) { return !service_running(SERVICE_WEB_SERVER); } void *socket_listen_main_static_threaded_worker(void *ptr) { worker_private = ptr; worker_private->running = 1; worker_register("WEB"); worker_register_job_name(WORKER_JOB_ADD_CONNECTION, "connect"); worker_register_job_name(WORKER_JOB_DEL_COLLECTION, "disconnect"); worker_register_job_name(WORKER_JOB_ADD_FILE, "file start"); worker_register_job_name(WORKER_JOB_DEL_FILE, "file end"); worker_register_job_name(WORKER_JOB_READ_FILE, "file read"); worker_register_job_name(WORKER_JOB_WRITE_FILE, "file write"); worker_register_job_name(WORKER_JOB_RCV_DATA, "receive"); worker_register_job_name(WORKER_JOB_SND_DATA, "send"); worker_register_job_name(WORKER_JOB_PROCESS, "process"); CLEANUP_FUNCTION_REGISTER(socket_listen_main_static_threaded_worker_cleanup) cleanup_ptr = worker_private; poll_events(&api_sockets , web_server_add_callback , web_server_del_callback , web_server_rcv_callback , web_server_snd_callback , NULL , web_server_should_stop , web_allow_connections_from , web_allow_connections_dns , NULL , web_client_first_request_timeout , web_client_timeout , default_rrd_update_every * 1000 // timer_milliseconds , ptr // timer_data , worker_private->max_sockets ); return NULL; } // ---------------------------------------------------------------------------- // web server main thread - also becomes a worker static void socket_listen_main_static_threaded_cleanup(void *pptr) { struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr); if(!static_thread) return; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; // int i, found = 0; // usec_t max = 2 * USEC_PER_SEC, step = 50000; // // // we start from 1, - 0 is self // for(i = 1; i < static_threaded_workers_count; i++) { // if(static_workers_private_data[i].running) { // found++; // netdata_log_info("stopping worker %d", i + 1); // nd_thread_signal_cancel(static_workers_private_data[i].thread); // } // else // netdata_log_info("found stopped worker %d", i + 1); // } // // while(found && max > 0) { // max -= step; // netdata_log_info("Waiting %d static web threads to finish...", found); // sleep_usec(step); // found = 0; // // // we start from 1, - 0 is self // for(i = 1; i < static_threaded_workers_count; i++) { // if (static_workers_private_data[i].running) // found++; // } // } // // if(found) // netdata_log_error("%d static web threads are taking too long to finish. Giving up.", found); netdata_log_info("closing all web server sockets..."); listen_sockets_close(&api_sockets); netdata_log_info("all static web threads stopped."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } void *socket_listen_main_static_threaded(void *ptr) { CLEANUP_FUNCTION_REGISTER(socket_listen_main_static_threaded_cleanup) cleanup_ptr = ptr; web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; if(!api_sockets.opened) fatal("LISTENER: no listen sockets available."); netdata_ssl_validate_certificate = !config_get_boolean(CONFIG_SECTION_WEB, "ssl skip certificate verification", !netdata_ssl_validate_certificate); if(!netdata_ssl_validate_certificate_sender) netdata_log_info("SSL: web server will skip SSL certificates verification."); netdata_ssl_initialize_ctx(NETDATA_SSL_WEB_SERVER_CTX); // 6 threads is the optimal value // since 6 are the parallel connections browsers will do // so, if the machine has more CPUs, avoid using resources unnecessarily int def_thread_count = MIN(get_netdata_cpus(), 6); if (!strcmp(config_get(CONFIG_SECTION_WEB, "mode", ""),"single-threaded")) { netdata_log_info("Running web server with one thread, because mode is single-threaded"); config_set(CONFIG_SECTION_WEB, "mode", "static-threaded"); def_thread_count = 1; } static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count); if (static_threaded_workers_count < 1) static_threaded_workers_count = 1; // See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) { static_threaded_workers_count = 1; netdata_log_info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading."); } size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 4)); static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker)); int i; for (i = 1; i < static_threaded_workers_count; i++) { static_workers_private_data[i].id = i; static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count; char tag[50 + 1]; snprintfz(tag, sizeof(tag) - 1, "WEB[%d]", i+1); static_workers_private_data[i].thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_DEFAULT, socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]); } // and the main one static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count; socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]); return NULL; }