summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c88
1 files changed, 63 insertions, 25 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index b083766d..d20658e6 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -30,6 +30,8 @@ void destroy_receiver_state(struct receiver_state *rpt) {
}
static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ worker_unregister();
+
static __thread int executed = 0;
if(!executed) {
executed = 1;
@@ -338,26 +340,31 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) {
return NULL;
}
+static void streaming_parser_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
+ parser_destroy(parser);
+}
+
size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
size_t result;
- PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
- user->enabled = cd->enabled;
- user->host = rpt->host;
- user->opaque = rpt;
- user->cd = cd;
- user->trust_durations = 0;
-
- PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
+
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->enabled,
+ .host = rpt->host,
+ .opaque = rpt,
+ .cd = cd,
+ .trust_durations = 1
+ };
+
+ PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT);
+
+ // this keeps the parser with its current value
+ // so, parser needs to be allocated before pushing it
+ netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
+
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
- if (unlikely(!parser)) {
- error("Failed to initialize parser");
- cd->serial_failures++;
- freez(user);
- return 0;
- }
-
parser->plugins_action->begin_action = &pluginsd_begin_action;
parser->plugins_action->flush_action = &pluginsd_flush_action;
parser->plugins_action->end_action = &pluginsd_end_action;
@@ -371,12 +378,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
parser->plugins_action->clabel_action = &pluginsd_clabel_action;
- user->parser = parser;
+ user.parser = parser;
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->reset(rpt->decompressor);
#endif
+
do{
if (receiver_read(rpt, fp))
break;
@@ -389,10 +397,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
rpt->last_msg_t = now_realtime_sec();
}
while(!netdata_exit);
+
done:
- result= user->count;
- freez(user);
- parser_destroy(parser);
+ result = user.count;
+
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
return result;
}
@@ -455,9 +466,23 @@ static int rrdpush_receive(struct receiver_state *rpt)
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
- error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+#ifdef ENABLE_HTTPS
+ rpt->host->stream_ssl.conn = rpt->ssl.conn;
+ rpt->host->stream_ssl.flags = rpt->ssl.flags;
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
close(rpt->fd);
- return 1;
+ return 0;
}
if (rpt->host==NULL) {
@@ -609,6 +634,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(sock_delnonblock(rpt->fd) < 0)
error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ struct timeval timeout;
+ timeout.tv_sec = 120;
+ timeout.tv_usec = 0;
+ if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
+ error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+
// convert the socket to a FILE *
FILE *fp = fdopen(rpt->fd, "r");
if(!fp) {
@@ -640,6 +671,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rpt->host->hostname);
}
}
+ rpt->host->senders_connect_time = now_realtime_sec();
+ rpt->host->senders_last_chart_command = 0;
+ rpt->host->trigger_chart_obsoletion_check = 1;
rrdhost_unlock(rpt->host);
// call the plugins.d processor to receive the metrics
@@ -648,9 +682,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
-#if defined(ENABLE_ACLK)
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
// in case we have cloud connection we inform cloud
- // new slave connected
+ // new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 1);
#endif
@@ -662,9 +696,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
-#if defined(ENABLE_ACLK)
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
// in case we have cloud connection we inform cloud
- // new slave connected
+ // new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 0);
#endif
@@ -675,6 +709,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_wrlock(rpt->host);
netdata_mutex_lock(&rpt->host->receiver_lock);
if (rpt->host->receiver == rpt) {
+ rpt->host->senders_connect_time = 0;
+ rpt->host->trigger_chart_obsoletion_check = 0;
rpt->host->senders_disconnected_time = now_realtime_sec();
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
if(health_enabled == CONFIG_BOOLEAN_AUTO)
@@ -699,7 +735,9 @@ void *rrdpush_receiver_thread(void *ptr) {
struct receiver_state *rpt = (struct receiver_state *)ptr;
info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ worker_register("STREAMRCV");
rrdpush_receive(rpt);
+ worker_unregister();
netdata_thread_cleanup_pop(1);
return NULL;