diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/streaming/receiver.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/streaming/receiver.c (renamed from streaming/receiver.c) | 94 |
1 files changed, 42 insertions, 52 deletions
diff --git a/streaming/receiver.c b/src/streaming/receiver.c index a12b94fb4..20f9342df 100644 --- a/streaming/receiver.c +++ b/src/streaming/receiver.c @@ -14,7 +14,6 @@ void receiver_state_free(struct receiver_state *rpt) { freez(rpt->os); freez(rpt->timezone); freez(rpt->abbrev_timezone); - freez(rpt->tags); freez(rpt->client_ip); freez(rpt->client_port); freez(rpt->program_name); @@ -308,64 +307,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // this keeps the parser with its current value // so, parser needs to be allocated before pushing it - netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - - { - bool compressed_connection = rrdpush_decompression_initialize(rpt); - - buffered_reader_init(&rpt->reader); + netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser) { + bool compressed_connection = rrdpush_decompression_initialize(rpt); + buffered_reader_init(&rpt->reader); #ifdef NETDATA_LOG_STREAM_RECEIVE - { - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( - rpt->host) : "unknown" - ); - parser->user.stream_log_fp = fopen(filename, "w"); - parser->user.stream_log_repertoire = PARSER_REP_METADATA; - } + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( + rpt->host) : "unknown" + ); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } #endif - CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); + CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), - ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), - ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), - ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); - - while(!receiver_should_stop(rpt)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(!buffered_reader_next_line(&rpt->reader, buffer)) { - STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + while(!receiver_should_stop(rpt)) { - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) - : receiver_read_uncompressed(rpt, &reason); + if(!buffered_reader_next_line(&rpt->reader, buffer)) { + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; - if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, reason, false); - break; - } + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) + : receiver_read_uncompressed(rpt, &reason); - continue; - } + if(unlikely(!have_new_data)) { + receiver_set_exit_reason(rpt, reason, false); + break; + } - if(unlikely(parser_action(parser, buffer->buffer))) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); - break; - } + continue; + } - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - result = parser->user.data_collections_count; + if(unlikely(parser_action(parser, buffer->buffer))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); + break; } - // free parser with the pop function - netdata_thread_cleanup_pop(1); + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + result = parser->user.data_collections_count; + } + netdata_thread_cleanup_pop(1); // free parser with the pop function return result; } @@ -556,7 +550,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.mode = default_rrd_memory_mode; rpt->config.history = default_rrd_history_entries; - rpt->config.health_enabled = (int)default_health_enabled; + rpt->config.health_enabled = health_plugin_enabled(); rpt->config.alarms_delay = 60; rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY; @@ -633,8 +627,6 @@ static void rrdpush_receive(struct receiver_state *rpt) rrdpush_parse_compression_order(rpt, order); } - (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); - // find the host for this receiver { // this will also update the host with our system_info @@ -646,7 +638,6 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->timezone, rpt->abbrev_timezone, rpt->utc_offset, - rpt->tags, rpt->program_name, rpt->program_version, rpt->config.update_every, @@ -699,7 +690,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef NETDATA_INTERNAL_CHECKS netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "client willing to stream metrics for host '%s' with machine_guid '%s': " - "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'" + "update every = %d, history = %d, memory mode = %s, health %s,%s" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -714,7 +705,6 @@ static void rrdpush_receive(struct receiver_state *rpt) #else , "" #endif - , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS |