summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:34 +0000
commitd079b656b4719739b2247dcd9d46e9bec793095a (patch)
treed2c950c70a776bcf697c963151c5bd959f8a9f03 /streaming/receiver.c
parentReleasing debian version 1.37.1-2. (diff)
downloadnetdata-d079b656b4719739b2247dcd9d46e9bec793095a.tar.xz
netdata-d079b656b4719739b2247dcd9d46e9bec793095a.zip
Merging upstream version 1.38.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c627
1 files changed, 351 insertions, 276 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 61ee33bc4..95652942e 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -16,7 +16,8 @@
extern struct config stream_config;
-void destroy_receiver_state(struct receiver_state *rpt) {
+void receiver_state_free(struct receiver_state *rpt) {
+
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
@@ -29,43 +30,23 @@ void destroy_receiver_state(struct receiver_state *rpt) {
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
+
#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn){
+ if(rpt->ssl.conn)
SSL_free(rpt->ssl.conn);
- }
#endif
+
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
#endif
- freez(rpt);
-}
-
-static void rrdpush_receiver_thread_cleanup(void *ptr) {
- worker_unregister();
- static __thread int executed = 0;
- if(!executed) {
- executed = 1;
- struct receiver_state *rpt = (struct receiver_state *) ptr;
- // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
- // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
- if (netdata_exit && rpt->host) {
- rpt->exited = 1;
- return;
- }
+ if(rpt->system_info)
+ rrdhost_system_info_free(rpt->system_info);
- // Make sure that we detach this thread and don't kill a freshly arriving receiver
- if (!netdata_exit && rpt->host) {
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == rpt)
- rpt->host->receiver = NULL;
- netdata_mutex_unlock(&rpt->host->receiver_lock);
- }
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
- info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- destroy_receiver_state(rpt);
- }
+ freez(rpt);
}
#include "collectors/plugins.d/pluginsd_parser.h"
@@ -105,11 +86,10 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
-
- metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
-
rrdhost_aclk_state_unlock(host);
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+
rrdpush_claimed_id(host);
return PARSER_RC_OK;
@@ -350,11 +330,13 @@ static void streaming_parser_thread_cleanup(void *ptr) {
parser_destroy(parser);
}
+bool plugin_is_enabled(struct plugind *cd);
+
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
size_t result;
PARSER_USER_OBJECT user = {
- .enabled = cd->enabled,
+ .enabled = plugin_is_enabled(cd),
.host = rpt->host,
.opaque = rpt,
.cd = cd,
@@ -390,39 +372,50 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
size_t read_buffer_start = 0;
char buffer[PLUGINSD_LINE_MAX + 2] = "";
- while(!netdata_exit) {
+ while(service_running(SERVICE_STREAMING)) {
+ netdata_thread_testcancel();
+
if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
bool have_new_data;
- if(compressed_connection)
+ if(likely(compressed_connection))
have_new_data = receiver_read_compressed(rpt);
else
have_new_data = receiver_read_uncompressed(rpt);
- if(!have_new_data)
+ if(unlikely(!have_new_data)) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "SOCKET READ ERROR";
+
break;
+ }
rpt->last_msg_t = now_realtime_sec();
continue;
}
- if(unlikely(netdata_exit)) {
- internal_error(true, "exiting...");
+ if(unlikely(!service_running(SERVICE_STREAMING))) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "NETDATA EXIT";
goto done;
}
- if(unlikely(rpt->shutdown)) {
- internal_error(true, "parser shutdown...");
+ if(unlikely(rpt->exit.shutdown)) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "SHUTDOWN REQUESTED";
+
goto done;
}
if (unlikely(parser_action(parser, buffer))) {
internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "PARSER FAILED";
+
break;
}
}
done:
- internal_error(true, "Streaming receiver thread stopping...");
-
result = user.count;
// free parser with the pop function
@@ -431,103 +424,240 @@ done:
return result;
}
-static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) {
+static void rrdpush_receiver_replication_reset(RRDHOST *host) {
RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
+ rrdset_foreach_read(st, host) {
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
}
rrdset_foreach_done(st);
- rrdhost_receiver_replicating_charts_zero(rpt->host);
+ rrdhost_receiver_replicating_charts_zero(host);
+}
+
+bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
+ bool signal_rrdcontext = false;
+ bool set_this = false;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if (!host->receiver || host->receiver == rpt) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+
+ host->receiver = rpt;
+ rpt->host = host;
+
+ host->child_connect_time = now_realtime_sec();
+ host->child_disconnected_time = 0;
+ host->child_last_chart_command = 0;
+ host->trigger_chart_obsoletion_check = 1;
+
+ if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
+ if (rpt->config.alarms_delay > 0) {
+ host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
+ log_health(
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
+ }
+ }
+
+// this is a test
+// if(rpt->hops <= host->sender->hops)
+// rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false);
+
+ signal_rrdcontext = true;
+ rrdpush_receiver_replication_reset(host);
+
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
+ set_this = true;
+ }
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(signal_rrdcontext)
+ rrdcontext_host_child_connected(host);
+
+ return set_this;
+}
+
+static void rrdhost_clear_receiver(struct receiver_state *rpt) {
+ bool signal_rrdcontext = false;
+
+ RRDHOST *host = rpt->host;
+ if(host) {
+ netdata_mutex_lock(&host->receiver_lock);
+
+ // Make sure that we detach this thread and don't kill a freshly arriving receiver
+ if(host->receiver == rpt) {
+ host->trigger_chart_obsoletion_check = 0;
+ host->child_connect_time = 0;
+ host->child_disconnected_time = now_realtime_sec();
+
+ if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO)
+ host->health.health_enabled = 0;
+
+ rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false);
+
+ signal_rrdcontext = true;
+ rrdpush_receiver_replication_reset(host);
+
+ if (host->receiver == rpt)
+ host->receiver = NULL;
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
+ }
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(signal_rrdcontext)
+ rrdcontext_host_child_disconnected(host);
+ }
+}
+
+bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
+ bool ret = false;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(host->receiver) {
+ if(!host->receiver->exit.shutdown) {
+ host->receiver->exit.shutdown = true;
+ host->receiver->exit.reason = reason;
+ shutdown(host->receiver->fd, SHUT_RDWR);
+ }
+
+ netdata_thread_cancel(host->receiver->thread);
+ }
+
+ int count = 2000;
+ while (host->receiver && count-- > 0) {
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ // let the lock for the receiver thread to exit
+ sleep_usec(1 * USEC_PER_MS);
+
+ netdata_mutex_lock(&host->receiver_lock);
+ }
+
+ if(host->receiver)
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "thread %d takes too long to stop, giving up..."
+ , rrdhost_hostname(host)
+ , host->receiver->client_ip, host->receiver->client_port
+ , gettid());
+ else
+ ret = true;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ return ret;
+}
+
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ (rpt->key && *rpt->key)? rpt->key : "-",
+ (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
+ (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
+ status);
+
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "%s. "
+ "STATUS: %s%s%s%s"
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ , msg
+ , status
+ , rpt->exit.reason?" (":""
+ , rpt->exit.reason?rpt->exit.reason:""
+ , rpt->exit.reason?")":""
+ );
+
+}
+
+static void rrdhost_reset_destinations(RRDHOST *host) {
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = 0;
}
static int rrdpush_receive(struct receiver_state *rpt)
{
- int history = default_rrd_history_entries;
- RRD_MEMORY_MODE mode = default_rrd_memory_mode;
- int health_enabled = default_health_enabled;
- int rrdpush_enabled = default_rrdpush_enabled;
- char *rrdpush_destination = default_rrdpush_destination;
- char *rrdpush_api_key = default_rrdpush_api_key;
- char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
- bool rrdpush_enable_replication = default_rrdpush_enable_replication;
- time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
- time_t rrdpush_replication_step = default_rrdpush_replication_step;
- time_t alarms_delay = 60;
-
- rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
- if(rpt->update_every < 0) rpt->update_every = 1;
-
- history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history);
- history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history);
- if(history < 5) history = 5;
-
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
-
- if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
- error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port);
- mode = default_rrd_memory_mode;
+ rpt->config.mode = default_rrd_memory_mode;
+ rpt->config.history = default_rrd_history_entries;
+
+ rpt->config.health_enabled = (int)default_health_enabled;
+ rpt->config.alarms_delay = 60;
+
+ rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled;
+ rpt->config.rrdpush_destination = default_rrdpush_destination;
+ rpt->config.rrdpush_api_key = default_rrdpush_api_key;
+ rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+
+ rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication;
+ rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
+ rpt->config.rrdpush_replication_step = default_rrdpush_replication_step;
+
+ rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every);
+ if(rpt->config.update_every < 0) rpt->config.update_every = 1;
+
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history);
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history);
+ if(rpt->config.history < 5) rpt->config.history = 5;
+
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode)));
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
+
+ if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
+ error("STREAM '%s' [receive from %s:%s]: "
+ "dbengine is not enabled, falling back to default."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+
+ rpt->config.mode = default_rrd_memory_mode;
}
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
+ rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled);
+ rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled);
- alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay);
- alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay);
- rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
- rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled);
+ rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled);
+ rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled);
- rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
- rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination);
+ rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination);
+ rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination);
- rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
- rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key);
+ rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key);
+ rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key);
- rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
- rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+ rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
+ rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
- rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication);
- rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication);
+ rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication);
+ rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication);
- rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate);
- rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
- rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step);
- rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression = default_compression_enabled;
- rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
- rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
- rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+ rpt->config.rrdpush_compression = default_compression_enabled;
+ rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
+ rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
+ rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled);
#endif //ENABLE_COMPRESSION
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
- if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
- error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
- char initial_response[HTTP_HEADER_SIZE + 1];
- snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
-#ifdef ENABLE_HTTPS
- if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
-#else
- if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
-#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- close(rpt->fd);
- return 0;
- }
- close(rpt->fd);
- return 0;
- }
-
- if (rpt->host==NULL) {
-
- rpt->host = rrdhost_find_or_create(
+ // find the host for this receiver
+ {
+ // this will also update the host with our system_info
+ RRDHOST *host = rrdhost_find_or_create(
rpt->hostname
, rpt->registry_hostname
, rpt->machine_guid
@@ -538,76 +668,41 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->tags
, rpt->program_name
, rpt->program_version
- , rpt->update_every
- , history
- , mode
- , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , rrdpush_enable_replication
- , rrdpush_seconds_to_replicate
- , rrdpush_replication_step
+ , rpt->config.update_every
+ , rpt->config.history
+ , rpt->config.mode
+ , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key)
+ , rpt->config.rrdpush_destination
+ , rpt->config.rrdpush_api_key
+ , rpt->config.rrdpush_send_charts_matching
+ , rpt->config.rrdpush_enable_replication
+ , rpt->config.rrdpush_seconds_to_replicate
+ , rpt->config.rrdpush_replication_step
, rpt->system_info
, 0
);
- if(!rpt->host) {
+ if(!host) {
+ rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
close(rpt->fd);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST");
- error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port);
return 1;
}
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == NULL)
- rpt->host->receiver = rpt;
- else {
- error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid);
- netdata_mutex_unlock(&rpt->host->receiver_lock);
+ // system_info has been consumed by the host structure
+ rpt->system_info = NULL;
+
+ if(!rrdhost_set_receiver(host, rpt)) {
+ rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
close(rpt->fd);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION");
return 1;
}
- netdata_mutex_unlock(&rpt->host->receiver_lock);
- }
- else {
- rrd_wrlock();
- rrdhost_update(
- rpt->host,
- rpt->hostname,
- rpt->registry_hostname,
- rpt->machine_guid,
- rpt->os,
- rpt->timezone,
- rpt->abbrev_timezone,
- rpt->utc_offset,
- rpt->tags,
- rpt->program_name,
- rpt->program_version,
- rpt->update_every,
- history,
- mode,
- (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO),
- (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key),
- rrdpush_destination,
- rrdpush_api_key,
- rrdpush_send_charts_matching,
- rrdpush_enable_replication,
- rrdpush_seconds_to_replicate,
- rrdpush_replication_step,
- rpt->system_info);
- rrd_unlock();
}
#ifdef NETDATA_INTERNAL_CHECKS
- int ssl = 0;
-#ifdef ENABLE_HTTPS
- if (rpt->ssl.conn != NULL)
- ssl = 1;
-#endif
- info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "client willing to stream metrics for host '%s' with machine_guid '%s': "
+ "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
, rpt->hostname
, rpt->client_ip
, rpt->client_port
@@ -616,20 +711,26 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->host->rrd_update_every
, rpt->host->rrd_history_entries
, rrd_memory_mode_name(rpt->host->rrd_memory_mode)
- , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
- , ssl ? " SSL," : ""
+ , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
+#ifdef ENABLE_HTTPS
+ , (rpt->ssl.conn != NULL) ? " SSL," : ""
+#else
+ , ""
+#endif
, rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS
struct plugind cd = {
- .enabled = 1,
.update_every = default_rrd_update_every,
- .pid = 0,
.serial_failures = 0,
.successful_collections = 0,
- .obsolete = 0,
+ .unsafe = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .running = true,
+ .enabled = true,
+ },
.started_t = now_realtime_sec(),
.next = NULL,
.capabilities = 0,
@@ -648,76 +749,60 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
#endif
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- char initial_response[HTTP_HEADER_SIZE];
- if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
- log_receiver_capabilities(rpt);
- sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
- }
- else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
- log_receiver_capabilities(rpt);
- sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
- } else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
- log_receiver_capabilities(rpt);
- sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
- } else { // stream_has_capability(rpt, STREAM_CAP_V1)
- log_receiver_capabilities(rpt);
- sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
- }
- debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+ {
+ // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
+ }
+ else { // stream_has_capability(rpt, STREAM_CAP_V1)
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
+ }
+
+ debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+ if(send_timeout(
#ifdef ENABLE_HTTPS
- if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
-#else
- if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+ &rpt->ssl,
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- close(rpt->fd);
- return 0;
- }
+ rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- // remove the non-blocking flag from the socket
- if(sock_delnonblock(rpt->fd) < 0)
- error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
-
- struct timeval timeout;
- timeout.tv_sec = 600;
- timeout.tv_usec = 0;
- if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
- error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
-
- rrdhost_wrlock(rpt->host);
-/* if(rpt->host->connected_senders > 0) {
- rrdhost_unlock(rpt->host);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED");
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- fclose(fp);
- return 0;
- }
-*/
-
-// rpt->host->connected_senders++;
- if(health_enabled != CONFIG_BOOLEAN_NO) {
- if(alarms_delay > 0) {
- rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
- log_health(
- "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
- rrdhost_hostname(rpt->host),
- (int64_t)alarms_delay);
+ rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
+ close(rpt->fd);
+ return 0;
}
}
- rpt->host->senders_connect_time = now_realtime_sec();
- rpt->host->senders_last_chart_command = 0;
- rpt->host->trigger_chart_obsoletion_check = 1;
- rrdhost_unlock(rpt->host);
+ {
+ // remove the non-blocking flag from the socket
+ if(sock_delnonblock(rpt->fd) < 0)
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "cannot remove the non-blocking flag from socket %d"
+ , rrdhost_hostname(rpt->host)
+ , rpt->client_ip, rpt->client_port
+ , rpt->fd);
- // call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...",
- rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ struct timeval timeout;
+ timeout.tv_sec = 600;
+ timeout.tv_usec = 0;
+ if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "cannot set timeout for socket %d"
+ , rrdhost_hostname(rpt->host)
+ , rpt->client_ip, rpt->client_port
+ , rpt->fd);
+ }
- log_stream_connection(rpt->client_ip, rpt->client_port,
- rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
+ rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
cd.capabilities = rpt->capabilities;
@@ -728,12 +813,10 @@ static int rrdpush_receive(struct receiver_state *rpt)
aclk_host_state_update(rpt->host, 1);
#endif
- rrdhost_set_is_parent_label(++localhost->senders_count);
+ rrdhost_set_is_parent_label(++localhost->connected_children_count);
- rrdpush_receiver_replication_reset(rpt);
- rrdcontext_host_child_connected(rpt->host);
-
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+ // let it reconnect to parent immediately
+ rrdhost_reset_destinations(rpt->host);
size_t count = streaming_parser(rpt, &cd, rpt->fd,
#ifdef ENABLE_HTTPS
@@ -745,15 +828,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
- log_stream_connection(rpt->client_ip, rpt->client_port,
- rpt->key, rpt->host->machine_guid, rpt->hostname,
- "DISCONNECTED");
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "PARSER EXIT";
- error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
- rpt->hostname, rpt->client_ip, rpt->client_port, count);
-
- rrdcontext_host_child_disconnected(rpt->host);
- rrdpush_receiver_replication_reset(rpt);
+ {
+ char msg[100 + 1];
+ snprintfz(msg, 100, "disconnected (completed %zu updates)", count);
+ rrdpush_receive_log_status(rpt, msg, "DISCONNECTED");
+ }
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -762,48 +844,41 @@ static int rrdpush_receive(struct receiver_state *rpt)
aclk_host_state_update(rpt->host, 0);
#endif
- rrdhost_set_is_parent_label(--localhost->senders_count);
-
- // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
- if (!netdata_exit && rpt->host) {
- rrd_rdlock();
- rrdhost_wrlock(rpt->host);
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == rpt) {
- rpt->host->senders_connect_time = 0;
- rpt->host->trigger_chart_obsoletion_check = 0;
- rpt->host->senders_disconnected_time = now_realtime_sec();
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
- if(health_enabled == CONFIG_BOOLEAN_AUTO)
- rpt->host->health_enabled = 0;
- }
- rrdhost_unlock(rpt->host);
- if (rpt->host->receiver == rpt) {
- rrdpush_sender_thread_stop(rpt->host);
- }
- netdata_mutex_unlock(&rpt->host->receiver_lock);
- rrd_unlock();
- }
+ rrdhost_set_is_parent_label(--localhost->connected_children_count);
// cleanup
close(rpt->fd);
return (int)count;
}
+static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ worker_unregister();
+
+ rrdhost_clear_receiver(rpt);
+
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "receive thread ended (task id %d)"
+ , rpt->hostname ? rpt->hostname : "-"
+ , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-"
+ , gettid());
+
+ receiver_state_free(rpt);
+}
+
void *rrdpush_receiver_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
- struct receiver_state *rpt = (struct receiver_state *)ptr;
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
-
worker_register("STREAMRCV");
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+
+ struct receiver_state *rpt = (struct receiver_state *)ptr;
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
rrdpush_receive(rpt);
- worker_unregister();
netdata_thread_cleanup_pop(1);
return NULL;
}
-