diff options
Diffstat (limited to 'src/streaming/receiver.c')
-rw-r--r-- | src/streaming/receiver.c | 673 |
1 files changed, 556 insertions, 117 deletions
diff --git a/src/streaming/receiver.c b/src/streaming/receiver.c index 0c0da2121..6c15004a3 100644 --- a/src/streaming/receiver.c +++ b/src/streaming/receiver.c @@ -3,12 +3,13 @@ #include "rrdpush.h" #include "web/server/h2o/http_server.h" -extern struct config stream_config; +// When a child disconnects this is the maximum we will wait +// before we update the cloud that the child is offline +#define MAX_CHILD_DISC_DELAY (30000) +#define MAX_CHILD_DISC_TOLERANCE (125 / 100) void receiver_state_free(struct receiver_state *rpt) { -#ifdef ENABLE_HTTPS netdata_ssl_close(&rpt->ssl); -#endif if(rpt->fd != -1) { internal_error(true, "closing socket..."); @@ -36,7 +37,7 @@ void receiver_state_free(struct receiver_state *rpt) { freez(rpt); } -#include "collectors/plugins.d/pluginsd_parser.h" +#include "plugins.d/pluginsd_parser.h" // IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly #define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) @@ -71,9 +72,7 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz errno_clear(); switch(wait_on_socket_or_cancel_with_timeout( -#ifdef ENABLE_HTTPS &r->ssl, -#endif r->fd, 0, POLLIN, NULL)) { case 0: // data are waiting @@ -93,14 +92,10 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return -2; } -#ifdef ENABLE_HTTPS if (SSL_connection(&r->ssl)) bytes_read = netdata_ssl_read(&r->ssl, buffer, size); else bytes_read = read(r->fd, buffer, size); -#else - bytes_read = read(r->fd, buffer, size); -#endif } while(bytes_read < 0 && errno == EINTR && tries--); @@ -325,7 +320,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i .capabilities = rpt->capabilities, }; - parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + parser = parser_init(&user, fd, fd, PARSER_INPUT_SPLIT, ssl); } #ifdef ENABLE_H2O @@ -336,10 +331,6 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i rrd_collector_started(); - // this keeps the parser with its current value - // so, parser needs to be allocated before pushing it - CLEANUP_FUNCTION_REGISTER(pluginsd_process_thread_cleanup) parser_ptr = parser; - bool compressed_connection = rrdpush_decompression_initialize(rpt); buffered_reader_init(&rpt->reader); @@ -365,6 +356,9 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i }; ND_LOG_STACK_PUSH(lgs); + __atomic_store_n(&rpt->parser, parser, __ATOMIC_RELAXED); + rrdpush_receiver_send_node_and_claim_id_to_child(rpt->host); + while(!receiver_should_stop(rpt)) { if(!buffered_reader_next_line(&rpt->reader, buffer)) { @@ -389,6 +383,17 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i buffer->len = 0; buffer->buffer[0] = '\0'; } + + // cleanup the sender buffer, because we may end-up reusing an incomplete buffer + sender_thread_buffer_free(); + parser->user.v2.stream_buffer.wb = NULL; + + // make sure send_to_plugin() will not write any data to the socket + spinlock_lock(&parser->writer.spinlock); + parser->fd_output = -1; + parser->ssl_output = NULL; + spinlock_unlock(&parser->writer.spinlock); + result = parser->user.data_collections_count; return result; } @@ -407,7 +412,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; - netdata_mutex_lock(&host->receiver_lock); + spinlock_lock(&host->receiver_lock); if (!host->receiver) { rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); @@ -433,7 +438,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { } } - host->health_log.health_log_history = rpt->config.alarms_history; + host->health_log.health_log_retention_s = rpt->config.alarms_history; // this is a test // if(rpt->hops <= host->sender->hops) @@ -450,7 +455,7 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { set_this = true; } - netdata_mutex_unlock(&host->receiver_lock); + spinlock_unlock(&host->receiver_lock); if(signal_rrdcontext) rrdcontext_host_child_connected(host); @@ -460,47 +465,56 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { static void rrdhost_clear_receiver(struct receiver_state *rpt) { RRDHOST *host = rpt->host; - if(host) { - bool signal_rrdcontext = false; - netdata_mutex_lock(&host->receiver_lock); + if(!host) return; + spinlock_lock(&host->receiver_lock); + { // Make sure that we detach this thread and don't kill a freshly arriving receiver - if(host->receiver == rpt) { + + if (host->receiver == rpt) { + spinlock_unlock(&host->receiver_lock); + { + // run all these without having the receiver lock + + stream_path_child_disconnected(host); + rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); + rrdpush_receiver_replication_reset(host); + rrdcontext_host_child_disconnected(host); + + if (rpt->config.health_enabled) + rrdcalc_child_disconnected(host); + + rrdpush_reset_destinations_postpone_time(host); + } + spinlock_lock(&host->receiver_lock); + + // now we have the lock again + __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); host->trigger_chart_obsoletion_check = 0; host->child_connect_time = 0; host->child_disconnected_time = now_realtime_sec(); - host->health.health_enabled = 0; - rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); - - signal_rrdcontext = true; - rrdpush_receiver_replication_reset(host); - + host->rrdpush_last_receiver_exit_reason = rpt->exit.reason; rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); host->receiver = NULL; - host->rrdpush_last_receiver_exit_reason = rpt->exit.reason; - - if(rpt->config.health_enabled) - rrdcalc_child_disconnected(host); } + } - netdata_mutex_unlock(&host->receiver_lock); - - if(signal_rrdcontext) - rrdcontext_host_child_disconnected(host); + // this must be cleared with the receiver lock + pluginsd_process_cleanup(rpt->parser); + __atomic_store_n(&rpt->parser, NULL, __ATOMIC_RELAXED); - rrdpush_reset_destinations_postpone_time(host); - } + spinlock_unlock(&host->receiver_lock); } bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { bool ret = false; - netdata_mutex_lock(&host->receiver_lock); + spinlock_lock(&host->receiver_lock); if(host->receiver) { if(!host->receiver->exit.shutdown) { @@ -514,12 +528,12 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { int count = 2000; while (host->receiver && count-- > 0) { - netdata_mutex_unlock(&host->receiver_lock); + spinlock_unlock(&host->receiver_lock); // let the lock for the receiver thread to exit sleep_usec(1 * USEC_PER_MS); - netdata_mutex_lock(&host->receiver_lock); + spinlock_lock(&host->receiver_lock); } if(host->receiver) @@ -531,16 +545,14 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { else ret = true; - netdata_mutex_unlock(&host->receiver_lock); + spinlock_unlock(&host->receiver_lock); return ret; } static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) { (void) send_timeout( -#ifdef ENABLE_HTTPS &rpt->ssl, -#endif rpt->fd, (char *)msg, strlen(msg), @@ -548,7 +560,7 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r 5); } -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) { +static void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) { // this function may be called BEFORE we spawn the receiver thread // so, we need to add the fields again (it does not harm) ND_LOG_STACK lgs[] = { @@ -582,26 +594,26 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.health_enabled = health_plugin_enabled(); rpt->config.alarms_delay = 60; - rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY; + rpt->config.alarms_history = HEALTH_LOG_RETENTION_DEFAULT; - rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; - rpt->config.rrdpush_destination = default_rrdpush_destination; - rpt->config.rrdpush_api_key = default_rrdpush_api_key; - rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + rpt->config.rrdpush_enabled = (int)stream_conf_send_enabled; + rpt->config.rrdpush_destination = stream_conf_send_destination; + rpt->config.rrdpush_api_key = stream_conf_send_api_key; + rpt->config.rrdpush_send_charts_matching = stream_conf_send_charts_matching; - rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication; - rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; - rpt->config.rrdpush_replication_step = default_rrdpush_replication_step; + rpt->config.rrdpush_enable_replication = stream_conf_replication_enabled; + rpt->config.rrdpush_seconds_to_replicate = stream_conf_replication_period; + rpt->config.rrdpush_replication_step = stream_conf_replication_step; - rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every); + rpt->config.update_every = (int)appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every); if(rpt->config.update_every < 0) rpt->config.update_every = 1; - rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history); - rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history); + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "retention", rpt->config.history); + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "retention", rpt->config.history); if(rpt->config.history < 5) rpt->config.history = 5; - rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode))); - rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "db", rrd_memory_mode_name(rpt->config.mode))); + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "db", rrd_memory_mode_name(rpt->config.mode))); if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { netdata_log_error("STREAM '%s' [receive from %s:%s]: " @@ -616,34 +628,34 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled); rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled); - rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); - rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_delay = appconfig_get_duration_seconds(&stream_config, rpt->key, "postpone alerts on connect", rpt->config.alarms_delay); + rpt->config.alarms_delay = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "postpone alerts on connect", rpt->config.alarms_delay); - rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history); - rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history); + rpt->config.alarms_history = appconfig_get_duration_seconds(&stream_config, rpt->key, "health log retention", rpt->config.alarms_history); + rpt->config.alarms_history = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "health log retention", rpt->config.alarms_history); - rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "proxy enabled", rpt->config.rrdpush_enabled); rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); - rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "proxy destination", rpt->config.rrdpush_destination); rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination); - rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "proxy api key", rpt->config.rrdpush_api_key); rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key); - rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching); rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching); rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication); rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication); - rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); - rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_duration_seconds(&stream_config, rpt->key, "replication period", rpt->config.rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_duration_seconds(&stream_config, rpt->machine_guid, "replication period", rpt->config.rrdpush_seconds_to_replicate); - rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); - rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "replication step", rpt->config.rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "replication step", rpt->config.rrdpush_replication_step); - rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; + rpt->config.rrdpush_compression = stream_conf_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); @@ -652,7 +664,7 @@ static void rrdpush_receive(struct receiver_state *rpt) is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral); if(rpt->config.rrdpush_compression) { - char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER); + const char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER); order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order); rrdpush_parse_compression_order(rpt, order); } @@ -730,11 +742,7 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") -#ifdef ENABLE_HTTPS , (rpt->ssl.conn != NULL) ? " SSL," : "" -#else - , "" -#endif ); #endif // NETDATA_INTERNAL_CHECKS @@ -784,9 +792,7 @@ static void rrdpush_receive(struct receiver_state *rpt) } else { #endif ssize_t bytes_sent = send_timeout( -#ifdef ENABLE_HTTPS &rpt->ssl, -#endif rpt->fd, initial_response, strlen(initial_response), 0, 60); if(bytes_sent != (ssize_t)strlen(initial_response)) { @@ -828,13 +834,9 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt, "connected and ready to receive data", RRDPUSH_STATUS_CONNECTED, NDLP_INFO); -#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected - if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 1, 1); -#endif - + schedule_node_state_update(rpt->host, 300); rrdhost_set_is_parent_label(); if (is_ephemeral) @@ -843,50 +845,28 @@ static void rrdpush_receive(struct receiver_state *rpt) // let it reconnect to parent immediately rrdpush_reset_destinations_postpone_time(rpt->host); - size_t count = streaming_parser(rpt, &cd, rpt->fd, -#ifdef ENABLE_HTTPS - (rpt->ssl.conn) ? &rpt->ssl : NULL -#else - NULL -#endif - ); + // receive data + size_t count = streaming_parser(rpt, &cd, rpt->fd, (rpt->ssl.conn) ? &rpt->ssl : NULL); + // the parser stopped receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false); { char msg[100 + 1]; snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count); - rrdpush_receive_log_status( - rpt, msg, - RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING); + rrdpush_receive_log_status(rpt, msg, RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING); } -#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // a child disconnected - if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 0, 1); -#endif + STREAM_PATH tmp = rrdhost_stream_path_fetch(rpt->host); + uint64_t total_reboot = (tmp.start_time + tmp.shutdown_time); + schedule_node_state_update(rpt->host, MIN((total_reboot * MAX_CHILD_DISC_TOLERANCE), MAX_CHILD_DISC_DELAY)); cleanup: ; } -static void rrdpush_receiver_thread_cleanup(void *pptr) { - struct receiver_state *rpt = CLEANUP_FUNCTION_GET_PTR(pptr); - if(!rpt) return; - - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "receive thread ended (task id %d)" - , rpt->hostname ? rpt->hostname : "-" - , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached()); - - worker_unregister(); - rrdhost_clear_receiver(rpt); - receiver_state_free(rpt); - rrdhost_set_is_parent_label(); -} - static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) { struct receiver_state *rpt = ptr; if(!rpt) @@ -901,16 +881,11 @@ static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) { if(!rpt) return false; -#ifdef ENABLE_HTTPS buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http"); -#else - buffer_strcat(wb, "http"); -#endif return true; } void *rrdpush_receiver_thread(void *ptr) { - CLEANUP_FUNCTION_REGISTER(rrdpush_receiver_thread_cleanup) cleanup_ptr = ptr; worker_register("STREAMRCV"); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, @@ -942,5 +917,469 @@ void *rrdpush_receiver_thread(void *ptr) { , rpt->client_port); rrdpush_receive(rpt); + + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " + "receive thread ended (task id %d)" + , rpt->hostname ? rpt->hostname : "-" + , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached()); + + worker_unregister(); + rrdhost_clear_receiver(rpt); + rrdhost_set_is_parent_label(); + receiver_state_free(rpt); return NULL; } + +int rrdpush_receiver_permission_denied(struct web_client *w) { + // we always respond with the same message and error code + // to prevent an attacker from gaining info about the error + buffer_flush(w->response.data); + buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED); + return HTTP_RESP_UNAUTHORIZED; +} + +int rrdpush_receiver_too_busy_now(struct web_client *w) { + // we always respond with the same message and error code + // to prevent an attacker from gaining info about the error + buffer_flush(w->response.data); + buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER); + return HTTP_RESP_SERVICE_UNAVAILABLE; +} + +static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) { + rpt->fd = w->ifd; + + rpt->ssl.conn = w->ssl.conn; + rpt->ssl.state = w->ssl.state; + + w->ssl = NETDATA_SSL_UNSET_CONNECTION; + + WEB_CLIENT_IS_DEAD(w); + + if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) { + web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET); + } + else { + if(w->ifd == w->ofd) + w->ifd = w->ofd = -1; + else + w->ifd = -1; + } + + buffer_flush(w->response.data); +} + +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx __maybe_unused) { + + if(!service_running(ABILITY_STREAMING_CONNECTIONS)) + return rrdpush_receiver_too_busy_now(w); + + struct receiver_state *rpt = callocz(1, sizeof(*rpt)); + rpt->connected_since_s = now_realtime_sec(); + rpt->last_msg_t = now_monotonic_sec(); + rpt->hops = 1; + + rpt->capabilities = STREAM_CAP_INVALID; + +#ifdef ENABLE_H2O + rpt->h2o_ctx = h2o_ctx; +#endif + + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); + + rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info)); + rpt->system_info->hops = rpt->hops; + + rpt->fd = -1; + rpt->client_ip = strdupz(w->client_ip); + rpt->client_port = strdupz(w->client_port); + + rpt->ssl = NETDATA_SSL_UNSET_CONNECTION; + + rpt->config.update_every = default_rrd_update_every; + + // parse the parameters and fill rpt and rpt->system_info + + while(decoded_query_string) { + char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&"); + if(!value || !*value) continue; + + char *name = strsep_skip_consecutive_separators(&value, "="); + if(!name || !*name) continue; + if(!value || !*value) continue; + + if(!strcmp(name, "key") && !rpt->key) + rpt->key = strdupz(value); + + else if(!strcmp(name, "hostname") && !rpt->hostname) + rpt->hostname = strdupz(value); + + else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname) + rpt->registry_hostname = strdupz(value); + + else if(!strcmp(name, "machine_guid") && !rpt->machine_guid) + rpt->machine_guid = strdupz(value); + + else if(!strcmp(name, "update_every")) + rpt->config.update_every = (int)strtoul(value, NULL, 0); + + else if(!strcmp(name, "os") && !rpt->os) + rpt->os = strdupz(value); + + else if(!strcmp(name, "timezone") && !rpt->timezone) + rpt->timezone = strdupz(value); + + else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone) + rpt->abbrev_timezone = strdupz(value); + + else if(!strcmp(name, "utc_offset")) + rpt->utc_offset = (int32_t)strtol(value, NULL, 0); + + else if(!strcmp(name, "hops")) + rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0); + + else if(!strcmp(name, "ml_capable")) + rpt->system_info->ml_capable = strtoul(value, NULL, 0); + + else if(!strcmp(name, "ml_enabled")) + rpt->system_info->ml_enabled = strtoul(value, NULL, 0); + + else if(!strcmp(name, "mc_version")) + rpt->system_info->mc_version = strtoul(value, NULL, 0); + + else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false); + + else { + // An old Netdata child does not have a compatible streaming protocol, map to something sane. + if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) + name = "NETDATA_HOST_OS_NAME"; + + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID")) + name = "NETDATA_HOST_OS_ID"; + + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")) + name = "NETDATA_HOST_OS_ID_LIKE"; + + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")) + name = "NETDATA_HOST_OS_VERSION"; + + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")) + name = "NETDATA_HOST_OS_VERSION_ID"; + + else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) + name = "NETDATA_HOST_OS_DETECTION"; + + else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID)) + rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); + + if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " + "request has parameter '%s' = '%s', which is not used." + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" + , rpt->client_ip, rpt->client_port + , name, value); + } + } + } + + if (rpt->capabilities & STREAM_CAP_INVALID) + // no version is supplied, assume version 0; + rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false); + + // find the program name and version + if(w->user_agent && w->user_agent[0]) { + char *t = strchr(w->user_agent, '/'); + if(t && *t) { + *t = '\0'; + t++; + } + + rpt->program_name = strdupz(w->user_agent); + if(t && *t) rpt->program_version = strdupz(t); + } + + // check if we should accept this connection + + if(!rpt->key || !*rpt->key) { + rrdpush_receive_log_status( + rpt, "request without an API key, rejecting connection", + RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if(!rpt->hostname || !*rpt->hostname) { + rrdpush_receive_log_status( + rpt, "request without a hostname, rejecting connection", + RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if(!rpt->registry_hostname) + rpt->registry_hostname = strdupz(rpt->hostname); + + if(!rpt->machine_guid || !*rpt->machine_guid) { + rrdpush_receive_log_status( + rpt, "request without a machine GUID, rejecting connection", + RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + { + char buf[GUID_LEN + 1]; + + if (regenerate_guid(rpt->key, buf) == -1) { + rrdpush_receive_log_status( + rpt, "API key is not a valid UUID (use the command uuidgen to generate one)", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if (regenerate_guid(rpt->machine_guid, buf) == -1) { + rrdpush_receive_log_status( + rpt, "machine GUID is not a valid UUID", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + } + + const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api"); + if(!api_key_type || !*api_key_type) api_key_type = "unknown"; + if(strcmp(api_key_type, "api") != 0) { + rrdpush_receive_log_status( + rpt, "API key is a machine GUID", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) { + rrdpush_receive_log_status( + rpt, "API key is not enabled", + RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + { + SIMPLE_PATTERN *key_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->key, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT, true); + + if(key_allow_from) { + if(!simple_pattern_matches(key_allow_from, w->client_ip)) { + simple_pattern_free(key_allow_from); + + rrdpush_receive_log_status( + rpt, "API key is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + simple_pattern_free(key_allow_from); + } + } + + { + const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine"); + if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown"; + + if (strcmp(machine_guid_type, "machine") != 0) { + rrdpush_receive_log_status( + rpt, "machine GUID is an API key", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + } + + if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) { + rrdpush_receive_log_status( + rpt, "machine GUID is not enabled", + RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + { + SIMPLE_PATTERN *machine_allow_from = simple_pattern_create( + appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"), + NULL, SIMPLE_PATTERN_EXACT, true); + + if(machine_allow_from) { + if(!simple_pattern_matches(machine_allow_from, w->client_ip)) { + simple_pattern_free(machine_allow_from); + + rrdpush_receive_log_status( + rpt, "machine GUID is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); + + receiver_state_free(rpt); + return rrdpush_receiver_permission_denied(w); + } + + simple_pattern_free(machine_allow_from); + } + } + + if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { + + rrdpush_receiver_takeover_web_connection(w, rpt); + + rrdpush_receive_log_status( + rpt, "machine GUID is my own", + RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG); + + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); + + if(send_timeout( + &rpt->ssl, + rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { + + nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: " + "failed to reply." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + } + + receiver_state_free(rpt); + return HTTP_RESP_OK; + } + + if(unlikely(web_client_streaming_rate_t > 0)) { + static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; + static time_t last_stream_accepted_t = 0; + + time_t now = now_realtime_sec(); + spinlock_lock(&spinlock); + + if(unlikely(last_stream_accepted_t == 0)) + last_stream_accepted_t = now; + + if(now - last_stream_accepted_t < web_client_streaming_rate_t) { + spinlock_unlock(&spinlock); + + char msg[100 + 1]; + snprintfz(msg, sizeof(msg) - 1, + "rate limit, will accept new connection in %ld secs", + (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); + + rrdpush_receive_log_status( + rpt, msg, + RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE); + + receiver_state_free(rpt); + return rrdpush_receiver_too_busy_now(w); + } + + last_stream_accepted_t = now; + spinlock_unlock(&spinlock); + } + + /* + * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver + * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting + * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that + * detaches the receiver from the host. If the host is being created (first time-access) then we also use the + * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a + * lookup to the now-attached structure). + */ + + { + time_t age = 0; + bool receiver_stale = false; + bool receiver_working = false; + + rrd_rdlock(); + RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid); + if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ + host = NULL; + + if (host) { + spinlock_lock(&host->receiver_lock); + if (host->receiver) { + age = now_monotonic_sec() - host->receiver->last_msg_t; + + if (age < 30) + receiver_working = true; + else + receiver_stale = true; + } + spinlock_unlock(&host->receiver_lock); + } + rrd_rdunlock(); + + if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) { + // we stopped the receiver + // we can proceed with this connection + receiver_stale = false; + + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " + "stopped previous stale receiver to accept this one." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + } + + if (receiver_working || receiver_stale) { + // another receiver is already connected + // try again later + + char msg[200 + 1]; + snprintfz(msg, sizeof(msg) - 1, + "multiple connections for same host, " + "old connection was last used %ld secs ago%s", + age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)"); + + rrdpush_receive_log_status( + rpt, msg, + RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG); + + // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up + buffer_flush(w->response.data); + buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING); + receiver_state_free(rpt); + return HTTP_RESP_CONFLICT; + } + } + + rrdpush_receiver_takeover_web_connection(w, rpt); + + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname); + tag[NETDATA_THREAD_TAG_MAX] = '\0'; + + rpt->thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt); + if(!rpt->thread) { + rrdpush_receive_log_status( + rpt, "can't create receiver thread", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); + + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "Can't handle this request"); + receiver_state_free(rpt); + return HTTP_RESP_INTERNAL_SERVER_ERROR; + } + + // prevent the caller from closing the streaming socket + return HTTP_RESP_OK; +} |