From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- streaming/receiver.c | 420 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 253 insertions(+), 167 deletions(-) (limited to 'streaming/receiver.c') diff --git a/streaming/receiver.c b/streaming/receiver.c index 10ef8b7d3..a12b94fb4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "web/server/h2o/http_server.h" extern struct config stream_config; @@ -28,9 +29,7 @@ void receiver_state_free(struct receiver_state *rpt) { close(rpt->fd); } -#ifdef ENABLE_RRDPUSH_COMPRESSION rrdpush_decompressor_destroy(&rpt->decompressor); -#endif if(rpt->system_info) rrdhost_system_info_free(rpt->system_info); @@ -59,6 +58,11 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return 0; } +#ifdef ENABLE_H2O + if (is_h2o_rrdpush(r)) + return (int)h2o_stream_read(r->h2o_ctx, buffer, size); +#endif + int tries = 100; ssize_t bytes_read; @@ -92,15 +96,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return (int)bytes_read; } -static inline bool receiver_read_uncompressed(struct receiver_state *r) { +static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) { + if(code > 0) + return 0; + + switch(code) { + case 0: + // asked to read zero bytes + return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER; + + case -1: + // EOF + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF; + + case -2: + // failed to read + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED; + + case -3: + // timeout + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT; + + default: + // anything else + return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + } +} + +static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { #ifdef NETDATA_INTERNAL_CHECKS if(r->reader.read_buffer[r->reader.read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); - if(unlikely(bytes_read <= 0)) + if(unlikely(bytes_read <= 0)) { + *reason = read_stream_error_to_reason(bytes_read); return false; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); @@ -111,8 +144,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) { return true; } -#ifdef ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { +static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0', "%s: read_buffer does not start with zero #2", __FUNCTION__ ); @@ -150,8 +182,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int bytes_read = 0; do { int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read); - if (unlikely(ret <= 0)) + if (unlikely(ret <= 0)) { + *reason = read_stream_error_to_reason(ret); return false; + } bytes_read += ret; } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); @@ -187,7 +221,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int last_read_bytes = read_stream(r, &compressed[start], remaining); if (unlikely(last_read_bytes <= 0)) { - internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + *reason = read_stream_error_to_reason(last_read_bytes); return false; } @@ -217,57 +251,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { return true; } -#else // !ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { - return receiver_read_uncompressed(r); -} -#endif // ENABLE_RRDPUSH_COMPRESSION - -/* Produce a full line if one exists, statefully return where we start next time. - * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. - */ -inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { - buffer_need_bytes(dst, reader->read_len - reader->pos + 2); - - size_t start = reader->pos; - - char *ss = &reader->read_buffer[start]; - char *se = &reader->read_buffer[reader->read_len]; - char *ds = &dst->buffer[dst->len]; - char *de = &ds[dst->size - dst->len - 2]; - - if(ss >= se) { - *ds = '\0'; - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; - } - - // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') { - *ds++ = *ss++; - dst->len++; - } - - // if we have a newline, return the buffer - if(ss < se && ds < de && *ss == '\n') { - // newline found in the r->read_buffer - - *ds++ = *ss++; // copy the newline too - dst->len++; - - *ds = '\0'; - - reader->pos = ss - reader->read_buffer; - return true; - } - - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; -} bool plugin_is_enabled(struct plugind *cd); @@ -315,6 +298,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); } +#ifdef ENABLE_H2O + parser->h2o_ctx = rpt->h2o_ctx; +#endif + pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); rrd_collector_started(); @@ -323,43 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - bool compressed_connection = false; + { + bool compressed_connection = rrdpush_decompression_initialize(rpt); -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - compressed_connection = true; - rrdpush_decompressor_reset(&rpt->decompressor); - } - else - rrdpush_decompressor_destroy(&rpt->decompressor); + buffered_reader_init(&rpt->reader); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( + rpt->host) : "unknown" + ); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } #endif - buffered_reader_init(&rpt->reader); + CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - while(!receiver_should_stop(rpt)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(!buffered_reader_next_line(&rpt->reader, buffer)) { - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); + while(!receiver_should_stop(rpt)) { - if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); - break; - } + if(!buffered_reader_next_line(&rpt->reader, buffer)) { + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; - continue; - } + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) + : receiver_read_uncompressed(rpt, &reason); - if (unlikely(parser_action(parser, buffer->buffer))) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); - break; - } + if(unlikely(!have_new_data)) { + receiver_set_exit_reason(rpt, reason, false); + break; + } - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - result = parser->user.data_collections_count; + continue; + } + + if(unlikely(parser_action(parser, buffer->buffer))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); + break; + } + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + result = parser->user.data_collections_count; + } // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -400,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { if (rpt->config.alarms_delay > 0) { host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; - netdata_log_health( - "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", - rrdhost_hostname(host), - (int64_t) rpt->config.alarms_delay); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); } } @@ -521,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r 5); } -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { - - log_stream_connection(rpt->client_ip, rpt->client_port, - (rpt->key && *rpt->key)? rpt->key : "-", - (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", - (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", - status); - - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "%s. " - "STATUS: %s%s%s%s" - , rpt->hostname - , rpt->client_ip, rpt->client_port - , msg - , status - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" - , stream_handshake_error_to_string(rpt->exit.reason) - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" - ); - +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) { + // this function may be called BEFORE we spawn the receiver thread + // so, we need to add the fields again (it does not harm) + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""), + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'" + , (rpt->key && *rpt->key)? rpt->key : "" + , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "" + , msg); + + nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s" + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "" + , msg + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" + ); } static void rrdpush_receive(struct receiver_state *rpt) @@ -611,11 +619,19 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); -#ifdef ENABLE_RRDPUSH_COMPRESSION rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); -#endif // ENABLE_RRDPUSH_COMPRESSION + + bool is_ephemeral = false; + is_ephemeral = appconfig_get_boolean(&stream_config, rpt->key, "is ephemeral node", CONFIG_BOOLEAN_NO); + is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral); + + if(rpt->config.rrdpush_compression) { + char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER); + order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order); + rrdpush_parse_compression_order(rpt, order); + } (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -623,39 +639,46 @@ static void rrdpush_receive(struct receiver_state *rpt) { // this will also update the host with our system_info RRDHOST *host = rrdhost_find_or_create( - rpt->hostname - , rpt->registry_hostname - , rpt->machine_guid - , rpt->os - , rpt->timezone - , rpt->abbrev_timezone - , rpt->utc_offset - , rpt->tags - , rpt->program_name - , rpt->program_version - , rpt->config.update_every - , rpt->config.history - , rpt->config.mode - , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) - , rpt->config.rrdpush_destination - , rpt->config.rrdpush_api_key - , rpt->config.rrdpush_send_charts_matching - , rpt->config.rrdpush_enable_replication - , rpt->config.rrdpush_seconds_to_replicate - , rpt->config.rrdpush_replication_step - , rpt->system_info - , 0 - ); + rpt->hostname, + rpt->registry_hostname, + rpt->machine_guid, + rpt->os, + rpt->timezone, + rpt->abbrev_timezone, + rpt->utc_offset, + rpt->tags, + rpt->program_name, + rpt->program_version, + rpt->config.update_every, + rpt->config.history, + rpt->config.mode, + (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO), + (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && + *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && + *rpt->config.rrdpush_api_key), + rpt->config.rrdpush_destination, + rpt->config.rrdpush_api_key, + rpt->config.rrdpush_send_charts_matching, + rpt->config.rrdpush_enable_replication, + rpt->config.rrdpush_seconds_to_replicate, + rpt->config.rrdpush_replication_step, + rpt->system_info, + 0); if(!host) { - rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt,"failed to find/create host structure, rejecting connection", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR); goto cleanup; } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { - rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); + rrdpush_receive_log_status( + rpt, "host is initializing, retry later", + RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); goto cleanup; } @@ -664,7 +687,10 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->system_info = NULL; if(!rrdhost_set_receiver(host, rpt)) { - rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt, "host is already served by another receiver", + RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING); goto cleanup; } @@ -709,12 +735,7 @@ static void rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->config.rrdpush_compression) - rpt->capabilities &= ~STREAM_CAP_COMPRESSION; - } -#endif // ENABLE_RRDPUSH_COMPRESSION + rrdpush_select_receiver_compression_algorithm(rpt); { // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); @@ -737,19 +758,32 @@ static void rrdpush_receive(struct receiver_state *rpt) } netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - ssize_t bytes_sent = send_timeout( +#ifdef ENABLE_H2O + if (is_h2o_rrdpush(rpt)) { + h2o_stream_write(rpt->h2o_ctx, initial_response, strlen(initial_response)); + } else { +#endif + ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS - &rpt->ssl, + &rpt->ssl, #endif - rpt->fd, initial_response, strlen(initial_response), 0, 60); - - if(bytes_sent != (ssize_t)strlen(initial_response)) { - internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); - rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); - goto cleanup; + rpt->fd, initial_response, strlen(initial_response), 0, 60); + + if(bytes_sent != (ssize_t)strlen(initial_response)) { + internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); + rrdpush_receive_log_status( + rpt, "cannot reply back, dropping connection", + RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR); + goto cleanup; + } +#ifdef ENABLE_H2O } +#endif } +#ifdef ENABLE_H2O + unless_h2o_rrdpush(rpt) +#endif { // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) @@ -770,17 +804,22 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->fd); } - rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); + rrdpush_receive_log_status( + rpt, "connected and ready to receive data", + RRDPUSH_STATUS_CONNECTED, NDLP_INFO); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 1); + aclk_host_state_update(rpt->host, 1, 1); #endif rrdhost_set_is_parent_label(); + if (is_ephemeral) + rrdhost_option_set(rpt->host, RRDHOST_OPTION_EPHEMERAL_HOST); + // let it reconnect to parent immediately rrdpush_reset_destinations_postpone_time(rpt->host); @@ -796,15 +835,17 @@ static void rrdpush_receive(struct receiver_state *rpt) { char msg[100 + 1]; - snprintfz(msg, 100, "disconnected (completed %zu updates)", count); - rrdpush_receive_log_status(rpt, msg, "DISCONNECTED"); + snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count); + rrdpush_receive_log_status( + rpt, msg, + RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING); } #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // a child disconnected if (netdata_cloud_enabled) - aclk_host_state_update(rpt->host, 0); + aclk_host_state_update(rpt->host, 0, 1); #endif cleanup: @@ -828,19 +869,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { rrdhost_set_is_parent_label(); } +static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + + stream_capabilities_to_string(wb, rpt->capabilities); + return true; +} + +static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + void *rrdpush_receiver_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - worker_register("STREAMRCV"); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + { + worker_register("STREAMRCV"); + + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, + "received bytes", "bytes/s", + WORKER_METRIC_INCREMENT); + + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, + "uncompressed bytes", "bytes/s", + WORKER_METRIC_INCREMENT); - struct receiver_state *rpt = (struct receiver_state *)ptr; - rpt->tid = gettid(); - netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + "replication completion", "%", + WORKER_METRIC_ABSOLUTE); - rrdpush_receive(rpt); + struct receiver_state *rpt = (struct receiver_state *) ptr; + rpt->tid = gettid(); + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname), + ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip + , rpt->client_port); + + rrdpush_receive(rpt); + } netdata_thread_cleanup_pop(1); return NULL; -- cgit v1.2.3