summaryrefslogtreecommitdiffstats
path: root/src/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/receiver.c (renamed from streaming/receiver.c)94
1 files changed, 42 insertions, 52 deletions
diff --git a/streaming/receiver.c b/src/streaming/receiver.c
index a12b94fb4..20f9342df 100644
--- a/streaming/receiver.c
+++ b/src/streaming/receiver.c
@@ -14,7 +14,6 @@ void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
- freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
@@ -308,64 +307,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
-
- {
- bool compressed_connection = rrdpush_decompression_initialize(rpt);
-
- buffered_reader_init(&rpt->reader);
+ netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser) {
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
+ buffered_reader_init(&rpt->reader);
#ifdef NETDATA_LOG_STREAM_RECEIVE
- {
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
- rpt->host) : "unknown"
- );
- parser->user.stream_log_fp = fopen(filename, "w");
- parser->user.stream_log_repertoire = PARSER_REP_METADATA;
- }
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
- ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
-
- while(!receiver_should_stop(rpt)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ while(!receiver_should_stop(rpt)) {
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
- : receiver_read_uncompressed(rpt, &reason);
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, reason, false);
- break;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- continue;
- }
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
+ }
- if(unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ continue;
+ }
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- result = parser->user.data_collections_count;
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
}
- // free parser with the pop function
- netdata_thread_cleanup_pop(1);
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
+ }
+ netdata_thread_cleanup_pop(1); // free parser with the pop function
return result;
}
@@ -556,7 +550,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.mode = default_rrd_memory_mode;
rpt->config.history = default_rrd_history_entries;
- rpt->config.health_enabled = (int)default_health_enabled;
+ rpt->config.health_enabled = health_plugin_enabled();
rpt->config.alarms_delay = 60;
rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY;
@@ -633,8 +627,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
rrdpush_parse_compression_order(rpt, order);
}
- (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
-
// find the host for this receiver
{
// this will also update the host with our system_info
@@ -646,7 +638,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->timezone,
rpt->abbrev_timezone,
rpt->utc_offset,
- rpt->tags,
rpt->program_name,
rpt->program_version,
rpt->config.update_every,
@@ -699,7 +690,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef NETDATA_INTERNAL_CHECKS
netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"client willing to stream metrics for host '%s' with machine_guid '%s': "
- "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'"
+ "update every = %d, history = %d, memory mode = %s, health %s,%s"
, rpt->hostname
, rpt->client_ip
, rpt->client_port
@@ -714,7 +705,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
#else
, ""
#endif
- , rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS