diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:34 +0000 |
commit | d079b656b4719739b2247dcd9d46e9bec793095a (patch) | |
tree | d2c950c70a776bcf697c963151c5bd959f8a9f03 /streaming/receiver.c | |
parent | Releasing debian version 1.37.1-2. (diff) | |
download | netdata-d079b656b4719739b2247dcd9d46e9bec793095a.tar.xz netdata-d079b656b4719739b2247dcd9d46e9bec793095a.zip |
Merging upstream version 1.38.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 627 |
1 files changed, 351 insertions, 276 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 61ee33bc4..95652942e 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -16,7 +16,8 @@ extern struct config stream_config; -void destroy_receiver_state(struct receiver_state *rpt) { +void receiver_state_free(struct receiver_state *rpt) { + freez(rpt->key); freez(rpt->hostname); freez(rpt->registry_hostname); @@ -29,43 +30,23 @@ void destroy_receiver_state(struct receiver_state *rpt) { freez(rpt->client_port); freez(rpt->program_name); freez(rpt->program_version); + #ifdef ENABLE_HTTPS - if(rpt->ssl.conn){ + if(rpt->ssl.conn) SSL_free(rpt->ssl.conn); - } #endif + #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->destroy(&rpt->decompressor); #endif - freez(rpt); -} - -static void rrdpush_receiver_thread_cleanup(void *ptr) { - worker_unregister(); - static __thread int executed = 0; - if(!executed) { - executed = 1; - struct receiver_state *rpt = (struct receiver_state *) ptr; - // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch - // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). - if (netdata_exit && rpt->host) { - rpt->exited = 1; - return; - } + if(rpt->system_info) + rrdhost_system_info_free(rpt->system_info); - // Make sure that we detach this thread and don't kill a freshly arriving receiver - if (!netdata_exit && rpt->host) { - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) - rpt->host->receiver = NULL; - netdata_mutex_unlock(&rpt->host->receiver_lock); - } + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); - info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - destroy_receiver_state(rpt); - } + freez(rpt); } #include "collectors/plugins.d/pluginsd_parser.h" @@ -105,11 +86,10 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - - metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); - rrdhost_aclk_state_unlock(host); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); + rrdpush_claimed_id(host); return PARSER_RC_OK; @@ -350,11 +330,13 @@ static void streaming_parser_thread_cleanup(void *ptr) { parser_destroy(parser); } +bool plugin_is_enabled(struct plugind *cd); + static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { size_t result; PARSER_USER_OBJECT user = { - .enabled = cd->enabled, + .enabled = plugin_is_enabled(cd), .host = rpt->host, .opaque = rpt, .cd = cd, @@ -390,39 +372,50 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(!netdata_exit) { + while(service_running(SERVICE_STREAMING)) { + netdata_thread_testcancel(); + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { bool have_new_data; - if(compressed_connection) + if(likely(compressed_connection)) have_new_data = receiver_read_compressed(rpt); else have_new_data = receiver_read_uncompressed(rpt); - if(!have_new_data) + if(unlikely(!have_new_data)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SOCKET READ ERROR"; + break; + } rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(netdata_exit)) { - internal_error(true, "exiting..."); + if(unlikely(!service_running(SERVICE_STREAMING))) { + if(!rpt->exit.reason) + rpt->exit.reason = "NETDATA EXIT"; goto done; } - if(unlikely(rpt->shutdown)) { - internal_error(true, "parser shutdown..."); + if(unlikely(rpt->exit.shutdown)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SHUTDOWN REQUESTED"; + goto done; } if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + + if(!rpt->exit.reason) + rpt->exit.reason = "PARSER FAILED"; + break; } } done: - internal_error(true, "Streaming receiver thread stopping..."); - result = user.count; // free parser with the pop function @@ -431,103 +424,240 @@ done: return result; } -static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { +static void rrdpush_receiver_replication_reset(RRDHOST *host) { RRDSET *st; - rrdset_foreach_read(st, rpt->host) { + rrdset_foreach_read(st, host) { rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); } rrdset_foreach_done(st); - rrdhost_receiver_replicating_charts_zero(rpt->host); + rrdhost_receiver_replicating_charts_zero(host); +} + +bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { + bool signal_rrdcontext = false; + bool set_this = false; + + netdata_mutex_lock(&host->receiver_lock); + + if (!host->receiver || host->receiver == rpt) { + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + + host->receiver = rpt; + rpt->host = host; + + host->child_connect_time = now_realtime_sec(); + host->child_disconnected_time = 0; + host->child_last_chart_command = 0; + host->trigger_chart_obsoletion_check = 1; + + if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { + if (rpt->config.alarms_delay > 0) { + host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); + } + } + +// this is a test +// if(rpt->hops <= host->sender->hops) +// rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false); + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + set_this = true; + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_connected(host); + + return set_this; +} + +static void rrdhost_clear_receiver(struct receiver_state *rpt) { + bool signal_rrdcontext = false; + + RRDHOST *host = rpt->host; + if(host) { + netdata_mutex_lock(&host->receiver_lock); + + // Make sure that we detach this thread and don't kill a freshly arriving receiver + if(host->receiver == rpt) { + host->trigger_chart_obsoletion_check = 0; + host->child_connect_time = 0; + host->child_disconnected_time = now_realtime_sec(); + + if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) + host->health.health_enabled = 0; + + rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + if (host->receiver == rpt) + host->receiver = NULL; + + rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_disconnected(host); + } +} + +bool stop_streaming_receiver(RRDHOST *host, const char *reason) { + bool ret = false; + + netdata_mutex_lock(&host->receiver_lock); + + if(host->receiver) { + if(!host->receiver->exit.shutdown) { + host->receiver->exit.shutdown = true; + host->receiver->exit.reason = reason; + shutdown(host->receiver->fd, SHUT_RDWR); + } + + netdata_thread_cancel(host->receiver->thread); + } + + int count = 2000; + while (host->receiver && count-- > 0) { + netdata_mutex_unlock(&host->receiver_lock); + + // let the lock for the receiver thread to exit + sleep_usec(1 * USEC_PER_MS); + + netdata_mutex_lock(&host->receiver_lock); + } + + if(host->receiver) + error("STREAM '%s' [receive from [%s]:%s]: " + "thread %d takes too long to stop, giving up..." + , rrdhost_hostname(host) + , host->receiver->client_ip, host->receiver->client_port + , gettid()); + else + ret = true; + + netdata_mutex_unlock(&host->receiver_lock); + + return ret; +} + +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { + + log_stream_connection(rpt->client_ip, rpt->client_port, + (rpt->key && *rpt->key)? rpt->key : "-", + (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", + (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", + status); + + info("STREAM '%s' [receive from [%s]:%s]: " + "%s. " + "STATUS: %s%s%s%s" + , rpt->hostname + , rpt->client_ip, rpt->client_port + , msg + , status + , rpt->exit.reason?" (":"" + , rpt->exit.reason?rpt->exit.reason:"" + , rpt->exit.reason?")":"" + ); + +} + +static void rrdhost_reset_destinations(RRDHOST *host) { + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = 0; } static int rrdpush_receive(struct receiver_state *rpt) { - int history = default_rrd_history_entries; - RRD_MEMORY_MODE mode = default_rrd_memory_mode; - int health_enabled = default_health_enabled; - int rrdpush_enabled = default_rrdpush_enabled; - char *rrdpush_destination = default_rrdpush_destination; - char *rrdpush_api_key = default_rrdpush_api_key; - char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; - bool rrdpush_enable_replication = default_rrdpush_enable_replication; - time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; - time_t rrdpush_replication_step = default_rrdpush_replication_step; - time_t alarms_delay = 60; - - rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); - if(rpt->update_every < 0) rpt->update_every = 1; - - history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history); - history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history); - if(history < 5) history = 5; - - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); - - if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); - mode = default_rrd_memory_mode; + rpt->config.mode = default_rrd_memory_mode; + rpt->config.history = default_rrd_history_entries; + + rpt->config.health_enabled = (int)default_health_enabled; + rpt->config.alarms_delay = 60; + + rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; + rpt->config.rrdpush_destination = default_rrdpush_destination; + rpt->config.rrdpush_api_key = default_rrdpush_api_key; + rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + + rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication; + rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + rpt->config.rrdpush_replication_step = default_rrdpush_replication_step; + + rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every); + if(rpt->config.update_every < 0) rpt->config.update_every = 1; + + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history); + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history); + if(rpt->config.history < 5) rpt->config.history = 5; + + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode))); + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); + + if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM '%s' [receive from %s:%s]: " + "dbengine is not enabled, falling back to default." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + + rpt->config.mode = default_rrd_memory_mode; } - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled); - alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay); - alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); - rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination); - rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination); - rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key); - rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression = default_compression_enabled; - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); - rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); + rpt->config.rrdpush_compression = default_compression_enabled; + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); + rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); #endif //ENABLE_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); - if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); - error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); - char initial_response[HTTP_HEADER_SIZE + 1]; - snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); -#ifdef ENABLE_HTTPS - if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { -#else - if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { -#endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - close(rpt->fd); - return 0; - } - close(rpt->fd); - return 0; - } - - if (rpt->host==NULL) { - - rpt->host = rrdhost_find_or_create( + // find the host for this receiver + { + // this will also update the host with our system_info + RRDHOST *host = rrdhost_find_or_create( rpt->hostname , rpt->registry_hostname , rpt->machine_guid @@ -538,76 +668,41 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->tags , rpt->program_name , rpt->program_version - , rpt->update_every - , history - , mode - , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) - , rrdpush_destination - , rrdpush_api_key - , rrdpush_send_charts_matching - , rrdpush_enable_replication - , rrdpush_seconds_to_replicate - , rrdpush_replication_step + , rpt->config.update_every + , rpt->config.history + , rpt->config.mode + , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) + , rpt->config.rrdpush_destination + , rpt->config.rrdpush_api_key + , rpt->config.rrdpush_send_charts_matching + , rpt->config.rrdpush_enable_replication + , rpt->config.rrdpush_seconds_to_replicate + , rpt->config.rrdpush_replication_step , rpt->system_info , 0 ); - if(!rpt->host) { + if(!host) { + rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST"); - error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port); return 1; } - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == NULL) - rpt->host->receiver = rpt; - else { - error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid); - netdata_mutex_unlock(&rpt->host->receiver_lock); + // system_info has been consumed by the host structure + rpt->system_info = NULL; + + if(!rrdhost_set_receiver(host, rpt)) { + rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION"); return 1; } - netdata_mutex_unlock(&rpt->host->receiver_lock); - } - else { - rrd_wrlock(); - rrdhost_update( - rpt->host, - rpt->hostname, - rpt->registry_hostname, - rpt->machine_guid, - rpt->os, - rpt->timezone, - rpt->abbrev_timezone, - rpt->utc_offset, - rpt->tags, - rpt->program_name, - rpt->program_version, - rpt->update_every, - history, - mode, - (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO), - (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key), - rrdpush_destination, - rrdpush_api_key, - rrdpush_send_charts_matching, - rrdpush_enable_replication, - rrdpush_seconds_to_replicate, - rrdpush_replication_step, - rpt->system_info); - rrd_unlock(); } #ifdef NETDATA_INTERNAL_CHECKS - int ssl = 0; -#ifdef ENABLE_HTTPS - if (rpt->ssl.conn != NULL) - ssl = 1; -#endif - info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + info("STREAM '%s' [receive from [%s]:%s]: " + "client willing to stream metrics for host '%s' with machine_guid '%s': " + "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -616,20 +711,26 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) - , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") - , ssl ? " SSL," : "" + , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") +#ifdef ENABLE_HTTPS + , (rpt->ssl.conn != NULL) ? " SSL," : "" +#else + , "" +#endif , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS struct plugind cd = { - .enabled = 1, .update_every = default_rrd_update_every, - .pid = 0, .serial_failures = 0, .successful_collections = 0, - .obsolete = 0, + .unsafe = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .running = true, + .enabled = true, + }, .started_t = now_realtime_sec(), .next = NULL, .capabilities = 0, @@ -648,76 +749,60 @@ static int rrdpush_receive(struct receiver_state *rpt) } #endif - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); - } - else if (stream_has_capability(rpt, STREAM_CAP_VN)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); - } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { // stream_has_capability(rpt, STREAM_CAP_V1) - log_receiver_capabilities(rpt); - sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); - } - debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + { + // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } + else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); + } + else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); + } + + debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + if(send_timeout( #ifdef ENABLE_HTTPS - if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { -#else - if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { + &rpt->ssl, #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - close(rpt->fd); - return 0; - } + rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - // remove the non-blocking flag from the socket - if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - - struct timeval timeout; - timeout.tv_sec = 600; - timeout.tv_usec = 0; - if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - - rrdhost_wrlock(rpt->host); -/* if(rpt->host->connected_senders > 0) { - rrdhost_unlock(rpt->host); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED"); - info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port); - fclose(fp); - return 0; - } -*/ - -// rpt->host->connected_senders++; - if(health_enabled != CONFIG_BOOLEAN_NO) { - if(alarms_delay > 0) { - rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; - log_health( - "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", - rrdhost_hostname(rpt->host), - (int64_t)alarms_delay); + rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); + close(rpt->fd); + return 0; } } - rpt->host->senders_connect_time = now_realtime_sec(); - rpt->host->senders_last_chart_command = 0; - rpt->host->trigger_chart_obsoletion_check = 1; - rrdhost_unlock(rpt->host); + { + // remove the non-blocking flag from the socket + if(sock_delnonblock(rpt->fd) < 0) + error("STREAM '%s' [receive from [%s]:%s]: " + "cannot remove the non-blocking flag from socket %d" + , rrdhost_hostname(rpt->host) + , rpt->client_ip, rpt->client_port + , rpt->fd); - // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", - rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + struct timeval timeout; + timeout.tv_sec = 600; + timeout.tv_usec = 0; + if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) + error("STREAM '%s' [receive from [%s]:%s]: " + "cannot set timeout for socket %d" + , rrdhost_hostname(rpt->host) + , rpt->client_ip, rpt->client_port + , rpt->fd); + } - log_stream_connection(rpt->client_ip, rpt->client_port, - rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); cd.capabilities = rpt->capabilities; @@ -728,12 +813,10 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 1); #endif - rrdhost_set_is_parent_label(++localhost->senders_count); + rrdhost_set_is_parent_label(++localhost->connected_children_count); - rrdpush_receiver_replication_reset(rpt); - rrdcontext_host_child_connected(rpt->host); - - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + // let it reconnect to parent immediately + rrdhost_reset_destinations(rpt->host); size_t count = streaming_parser(rpt, &cd, rpt->fd, #ifdef ENABLE_HTTPS @@ -745,15 +828,14 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - log_stream_connection(rpt->client_ip, rpt->client_port, - rpt->key, rpt->host->machine_guid, rpt->hostname, - "DISCONNECTED"); + if(!rpt->exit.reason) + rpt->exit.reason = "PARSER EXIT"; - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", - rpt->hostname, rpt->client_ip, rpt->client_port, count); - - rrdcontext_host_child_disconnected(rpt->host); - rrdpush_receiver_replication_reset(rpt); + { + char msg[100 + 1]; + snprintfz(msg, 100, "disconnected (completed %zu updates)", count); + rrdpush_receive_log_status(rpt, msg, "DISCONNECTED"); + } #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -762,48 +844,41 @@ static int rrdpush_receive(struct receiver_state *rpt) aclk_host_state_update(rpt->host, 0); #endif - rrdhost_set_is_parent_label(--localhost->senders_count); - - // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread - if (!netdata_exit && rpt->host) { - rrd_rdlock(); - rrdhost_wrlock(rpt->host); - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) { - rpt->host->senders_connect_time = 0; - rpt->host->trigger_chart_obsoletion_check = 0; - rpt->host->senders_disconnected_time = now_realtime_sec(); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); - if(health_enabled == CONFIG_BOOLEAN_AUTO) - rpt->host->health_enabled = 0; - } - rrdhost_unlock(rpt->host); - if (rpt->host->receiver == rpt) { - rrdpush_sender_thread_stop(rpt->host); - } - netdata_mutex_unlock(&rpt->host->receiver_lock); - rrd_unlock(); - } + rrdhost_set_is_parent_label(--localhost->connected_children_count); // cleanup close(rpt->fd); return (int)count; } +static void rrdpush_receiver_thread_cleanup(void *ptr) { + struct receiver_state *rpt = (struct receiver_state *) ptr; + worker_unregister(); + + rrdhost_clear_receiver(rpt); + + info("STREAM '%s' [receive from [%s]:%s]: " + "receive thread ended (task id %d)" + , rpt->hostname ? rpt->hostname : "-" + , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-" + , gettid()); + + receiver_state_free(rpt); +} + void *rrdpush_receiver_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - struct receiver_state *rpt = (struct receiver_state *)ptr; - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - worker_register("STREAMRCV"); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + + struct receiver_state *rpt = (struct receiver_state *)ptr; + info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + rrdpush_receive(rpt); - worker_unregister(); netdata_thread_cleanup_pop(1); return NULL; } - |