diff options
Diffstat (limited to '')
-rw-r--r-- | src/streaming/receiver.c (renamed from streaming/receiver.c) | 219 |
1 files changed, 117 insertions, 102 deletions
diff --git a/streaming/receiver.c b/src/streaming/receiver.c index a12b94fb4..2cbf247dc 100644 --- a/streaming/receiver.c +++ b/src/streaming/receiver.c @@ -14,7 +14,6 @@ void receiver_state_free(struct receiver_state *rpt) { freez(rpt->os); freez(rpt->timezone); freez(rpt->abbrev_timezone); - freez(rpt->tags); freez(rpt->client_ip); freez(rpt->client_port); freez(rpt->program_name); @@ -59,8 +58,12 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz } #ifdef ENABLE_H2O - if (is_h2o_rrdpush(r)) + if (is_h2o_rrdpush(r)) { + if(nd_thread_signaled_to_cancel()) + return -4; + return (int)h2o_stream_read(r->h2o_ctx, buffer, size); + } #endif int tries = 100; @@ -69,6 +72,29 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz do { errno = 0; + switch(wait_on_socket_or_cancel_with_timeout( +#ifdef ENABLE_HTTPS + &r->ssl, +#endif + r->fd, 0, POLLIN, NULL)) + { + case 0: // data are waiting + break; + + case 1: // timeout reached + netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + return -3; + + case -1: // thread cancelled + netdata_log_error("STREAM: %s(): thread has been cancelled timeout while waiting for data on socket!", __FUNCTION__); + return -4; + + default: + case 2: // error on socket + netdata_log_error("STREAM: %s() socket error!", __FUNCTION__); + return -2; + } + #ifdef ENABLE_HTTPS if (SSL_connection(&r->ssl)) bytes_read = netdata_ssl_read(&r->ssl, buffer, size); @@ -117,6 +143,10 @@ static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) { // timeout return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT; + case -4: + // the thread is cancelled + return STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN; + default: // anything else return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; @@ -262,6 +292,11 @@ static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAK static inline bool receiver_should_stop(struct receiver_state *rpt) { static __thread size_t counter = 0; + if(nd_thread_signaled_to_cancel()) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); + return true; + } + if(unlikely(rpt->exit.shutdown)) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); return true; @@ -272,11 +307,8 @@ static inline bool receiver_should_stop(struct receiver_state *rpt) { return true; } - if(unlikely((counter++ % 1000) == 0)) { - // check every 1000 lines read - netdata_thread_testcancel(); + if(unlikely((counter++ % 1000) == 0)) rpt->last_msg_t = now_monotonic_sec(); - } return false; } @@ -308,65 +340,58 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // this keeps the parser with its current value // so, parser needs to be allocated before pushing it - netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); + CLEANUP_FUNCTION_REGISTER(pluginsd_process_thread_cleanup) parser_ptr = parser; - { - bool compressed_connection = rrdpush_decompression_initialize(rpt); - - buffered_reader_init(&rpt->reader); + bool compressed_connection = rrdpush_decompression_initialize(rpt); + buffered_reader_init(&rpt->reader); #ifdef NETDATA_LOG_STREAM_RECEIVE - { - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( - rpt->host) : "unknown" - ); - parser->user.stream_log_fp = fopen(filename, "w"); - parser->user.stream_log_repertoire = PARSER_REP_METADATA; - } + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( + rpt->host) : "unknown" + ); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } #endif - CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), - ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), - ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), - ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); - - while(!receiver_should_stop(rpt)) { - - if(!buffered_reader_next_line(&rpt->reader, buffer)) { - STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) - : receiver_read_uncompressed(rpt, &reason); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, reason, false); - break; - } + while(!receiver_should_stop(rpt)) { - continue; - } + if(!buffered_reader_next_line(&rpt->reader, buffer)) { + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; - if(unlikely(parser_action(parser, buffer->buffer))) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); - break; - } + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) + : receiver_read_uncompressed(rpt, &reason); - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - result = parser->user.data_collections_count; + if(unlikely(!have_new_data)) { + receiver_set_exit_reason(rpt, reason, false); + break; } - // free parser with the pop function - netdata_thread_cleanup_pop(1); + continue; + } + if(unlikely(parser_action(parser, buffer->buffer))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); + break; + } + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + result = parser->user.data_collections_count; return result; } @@ -436,10 +461,9 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { } static void rrdhost_clear_receiver(struct receiver_state *rpt) { - bool signal_rrdcontext = false; - RRDHOST *host = rpt->host; if(host) { + bool signal_rrdcontext = false; netdata_mutex_lock(&host->receiver_lock); // Make sure that we detach this thread and don't kill a freshly arriving receiver @@ -451,8 +475,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { host->child_connect_time = 0; host->child_disconnected_time = now_realtime_sec(); - if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) - host->health.health_enabled = 0; + host->health.health_enabled = 0; rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); @@ -462,6 +485,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); host->receiver = NULL; host->rrdpush_last_receiver_exit_reason = rpt->exit.reason; + + if(rpt->config.health_enabled) + rrdcalc_child_disconnected(host); } netdata_mutex_unlock(&host->receiver_lock); @@ -485,7 +511,7 @@ bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { shutdown(host->receiver->fd, SHUT_RDWR); } - netdata_thread_cancel(host->receiver->thread); + nd_thread_signal_cancel(host->receiver->thread); } int count = 2000; @@ -556,7 +582,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.mode = default_rrd_memory_mode; rpt->config.history = default_rrd_history_entries; - rpt->config.health_enabled = (int)default_health_enabled; + rpt->config.health_enabled = health_plugin_enabled(); rpt->config.alarms_delay = 60; rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY; @@ -633,8 +659,6 @@ static void rrdpush_receive(struct receiver_state *rpt) rrdpush_parse_compression_order(rpt, order); } - (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); - // find the host for this receiver { // this will also update the host with our system_info @@ -646,7 +670,6 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->timezone, rpt->abbrev_timezone, rpt->utc_offset, - rpt->tags, rpt->program_name, rpt->program_version, rpt->config.update_every, @@ -699,7 +722,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef NETDATA_INTERNAL_CHECKS netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "client willing to stream metrics for host '%s' with machine_guid '%s': " - "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'" + "update every = %d, history = %d, memory mode = %s, health %s,%s" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -714,7 +737,6 @@ static void rrdpush_receive(struct receiver_state *rpt) #else , "" #endif - , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS @@ -852,20 +874,18 @@ cleanup: ; } -static void rrdpush_receiver_thread_cleanup(void *ptr) { - struct receiver_state *rpt = (struct receiver_state *) ptr; - worker_unregister(); - - rrdhost_clear_receiver(rpt); +static void rrdpush_receiver_thread_cleanup(void *pptr) { + struct receiver_state *rpt = CLEANUP_FUNCTION_GET_PTR(pptr); + if(!rpt) return; netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "receive thread ended (task id %d)" - , rpt->hostname ? rpt->hostname : "-" - , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-" - , gettid()); + , rpt->hostname ? rpt->hostname : "-" + , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached()); + worker_unregister(); + rrdhost_clear_receiver(rpt); receiver_state_free(rpt); - rrdhost_set_is_parent_label(); } @@ -892,42 +912,37 @@ static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) { } void *rrdpush_receiver_thread(void *ptr) { - netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - - { - worker_register("STREAMRCV"); - - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, - "received bytes", "bytes/s", - WORKER_METRIC_INCREMENT); + CLEANUP_FUNCTION_REGISTER(rrdpush_receiver_thread_cleanup) cleanup_ptr = ptr; + worker_register("STREAMRCV"); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, - "uncompressed bytes", "bytes/s", - WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, + "received bytes", "bytes/s", + WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, - "replication completion", "%", - WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, + "uncompressed bytes", "bytes/s", + WORKER_METRIC_INCREMENT); - struct receiver_state *rpt = (struct receiver_state *) ptr; - rpt->tid = gettid(); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + "replication completion", "%", + WORKER_METRIC_ABSOLUTE); - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), - ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), - ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname), - ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt), - ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); + struct receiver_state *rpt = (struct receiver_state *) ptr; + rpt->tid = gettid_cached(); - netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip - , rpt->client_port); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname), + ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - rrdpush_receive(rpt); - } + netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip + , rpt->client_port); - netdata_thread_cleanup_pop(1); + rrdpush_receive(rpt); return NULL; } |