diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 88 |
1 files changed, 63 insertions, 25 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index b083766dd..d20658e65 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -30,6 +30,8 @@ void destroy_receiver_state(struct receiver_state *rpt) { } static void rrdpush_receiver_thread_cleanup(void *ptr) { + worker_unregister(); + static __thread int executed = 0; if(!executed) { executed = 1; @@ -338,26 +340,31 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) { return NULL; } +static void streaming_parser_thread_cleanup(void *ptr) { + PARSER *parser = (PARSER *)ptr; + parser_destroy(parser); +} + size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { size_t result; - PARSER_USER_OBJECT *user = callocz(1, sizeof(*user)); - user->enabled = cd->enabled; - user->host = rpt->host; - user->opaque = rpt; - user->cd = cd; - user->trust_durations = 0; - - PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT); + + PARSER_USER_OBJECT user = { + .enabled = cd->enabled, + .host = rpt->host, + .opaque = rpt, + .cd = cd, + .trust_durations = 1 + }; + + PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT); + + // this keeps the parser with its current value + // so, parser needs to be allocated before pushing it + netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); + parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - if (unlikely(!parser)) { - error("Failed to initialize parser"); - cd->serial_failures++; - freez(user); - return 0; - } - parser->plugins_action->begin_action = &pluginsd_begin_action; parser->plugins_action->flush_action = &pluginsd_flush_action; parser->plugins_action->end_action = &pluginsd_end_action; @@ -371,12 +378,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user->parser = parser; + user.parser = parser; #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->reset(rpt->decompressor); #endif + do{ if (receiver_read(rpt, fp)) break; @@ -389,10 +397,13 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp rpt->last_msg_t = now_realtime_sec(); } while(!netdata_exit); + done: - result= user->count; - freez(user); - parser_destroy(parser); + result = user.count; + + // free parser with the pop function + netdata_thread_cleanup_pop(1); + return result; } @@ -455,9 +466,23 @@ static int rrdpush_receive(struct receiver_state *rpt) if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); - error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); + error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); + char initial_response[HTTP_HEADER_SIZE + 1]; + snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); +#ifdef ENABLE_HTTPS + rpt->host->stream_ssl.conn = rpt->ssl.conn; + rpt->host->stream_ssl.flags = rpt->ssl.flags; + if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { +#else + if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { +#endif + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + close(rpt->fd); + return 0; + } close(rpt->fd); - return 1; + return 0; } if (rpt->host==NULL) { @@ -609,6 +634,12 @@ static int rrdpush_receive(struct receiver_state *rpt) if(sock_delnonblock(rpt->fd) < 0) error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + struct timeval timeout; + timeout.tv_sec = 120; + timeout.tv_usec = 0; + if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + // convert the socket to a FILE * FILE *fp = fdopen(rpt->fd, "r"); if(!fp) { @@ -640,6 +671,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rpt->host->hostname); } } + rpt->host->senders_connect_time = now_realtime_sec(); + rpt->host->senders_last_chart_command = 0; + rpt->host->trigger_chart_obsoletion_check = 1; rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics @@ -648,9 +682,9 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; -#if defined(ENABLE_ACLK) +#if defined(ENABLE_NEW_CLOUD_PROTOCOL) // in case we have cloud connection we inform cloud - // new slave connected + // new child connected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 1); #endif @@ -662,9 +696,9 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); -#if defined(ENABLE_ACLK) +#if defined(ENABLE_NEW_CLOUD_PROTOCOL) // in case we have cloud connection we inform cloud - // new slave connected + // new child connected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 0); #endif @@ -675,6 +709,8 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_wrlock(rpt->host); netdata_mutex_lock(&rpt->host->receiver_lock); if (rpt->host->receiver == rpt) { + rpt->host->senders_connect_time = 0; + rpt->host->trigger_chart_obsoletion_check = 0; rpt->host->senders_disconnected_time = now_realtime_sec(); rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); if(health_enabled == CONFIG_BOOLEAN_AUTO) @@ -699,7 +735,9 @@ void *rrdpush_receiver_thread(void *ptr) { struct receiver_state *rpt = (struct receiver_state *)ptr; info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); + worker_register("STREAMRCV"); rrdpush_receive(rpt); + worker_unregister(); netdata_thread_cleanup_pop(1); return NULL; |