diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /streaming/sender.c | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 581 |
1 files changed, 436 insertions, 145 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 71f875034..09b67e968 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1,31 +1,37 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" - -#define WORKER_SENDER_JOB_CONNECT 0 -#define WORKER_SENDER_JOB_PIPE_READ 1 -#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 -#define WORKER_SENDER_JOB_EXECUTE 3 -#define WORKER_SENDER_JOB_SOCKET_SEND 4 -#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 -#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 -#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 -#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 -#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 -#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 -#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 -#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 -#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 -#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 -#define WORKER_SENDER_JOB_BUFFER_RATIO 15 -#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 -#define WORKER_SENDER_JOB_BYTES_SENT 17 -#define WORKER_SENDER_JOB_REPLAY_REQUEST 18 -#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19 -#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20 - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 +#include "common.h" +#include "aclk/https_client.h" + +#define WORKER_SENDER_JOB_CONNECT 0 +#define WORKER_SENDER_JOB_PIPE_READ 1 +#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 +#define WORKER_SENDER_JOB_EXECUTE 3 +#define WORKER_SENDER_JOB_SOCKET_SEND 4 +#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 +#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 +#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 +#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 +#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 +#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 +#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 +#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 +#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 +#define WORKER_SENDER_JOB_BUFFER_RATIO 15 +#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 +#define WORKER_SENDER_JOB_BYTES_SENT 17 +#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18 +#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19 +#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20 +#define WORKER_SENDER_JOB_REPLAY_REQUEST 21 +#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23 +#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 25 #endif extern struct config stream_config; @@ -66,21 +72,6 @@ BUFFER *sender_start(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); -#ifdef ENABLE_RRDPUSH_COMPRESSION -/* -* In case of stream compression buffer overflow -* Inform the user through the error log file and -* deactivate compression by downgrading the stream protocol. -*/ -static inline void deactivate_compression(struct sender_state *s) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it."); - s->flags &= ~SENDER_FLAG_COMPRESSION; - netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); - rrdpush_sender_thread_close_socket(s->host); -} -#endif - #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 // Collector thread finishing a transmission @@ -102,13 +93,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) sender_lock(s); -// FILE *fp = fopen("/tmp/stream.txt", "a"); -// fprintf(fp, -// "\n--- SEND BEGIN: %s ----\n" -// "%s" -// "--- SEND END ----------------------------------------\n" -// , rrdhost_hostname(s->host), src); -// fclose(fp); +#ifdef NETDATA_LOG_STREAM_SENDER + if(type == STREAM_TRAFFIC_TYPE_METADATA) { + if(!s->stream_log_fp) { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown"); + + s->stream_log_fp = fopen(filename, "w"); + } + + fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n" + "%s" + "--- SEND MESSAGE END ----------------------------------------\n" + , rrdhost_hostname(s->host), src + ); + } +#endif if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", @@ -117,8 +117,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) { + if (s->compressor.initialized) { while(src_len) { size_t size_to_compress = src_len; @@ -143,28 +142,45 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } } - char *dst; + const char *dst; size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); - rrdpush_compressor_reset(&s->compressor); + rrdpush_compression_initialize(s); dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if(!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", rrdhost_hostname(s->host), s->connected_to); - deactivate_compression(s); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); + rrdpush_compression_deactivate(s); + rrdpush_sender_thread_close_socket(s->host); sender_unlock(s); return; } } - if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len); + +#ifdef NETDATA_INTERNAL_CHECKS + // check if reversing the signature provides the same length + size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature)); + if(decoded_dst_len != dst_len) + fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, " + "compressed payload length %zu bytes, but signature says payload is %zu bytes", + size_to_compress, dst_len, decoded_dst_len); +#endif + + if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature))) s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += dst_len; + else { + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature); + } src = src + size_to_compress; src_len -= size_to_compress; @@ -174,12 +190,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->flags |= SENDER_FLAG_OVERFLOW; else s->sent_bytes_on_this_connection_per_type[type] += src_len; -#else - if(cbuffer_add_unsafe(s->buffer, src, src_len)) - s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += src_len; -#endif replication_recalculate_buffer_used_ratio_unsafe(s); @@ -191,7 +201,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) sender_unlock(s); - if(signal_sender) + if(signal_sender && (!stream_has_capability(s, STREAM_CAP_INTERPOLATED) || type != STREAM_TRAFFIC_TYPE_DATA)) rrdpush_signal_sender_to_wake_up(s); } @@ -251,15 +261,17 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - st->upstream_resync_time_s = 0; + st->rrdpush.sender.resync_time_s = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) - rrddim_clear_exposed(rd); + rrddim_metadata_exposed_upstream_clear(rd); rrddim_foreach_done(rd); + + rrdset_metadata_updated(st); } rrdset_foreach_done(st); @@ -342,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { rrdpush_sender_charts_and_replication_reset(host); } -void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) -{ +void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) { se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz(""); se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz(""); se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz(""); @@ -351,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz(""); } -void rrdpush_clean_encoded(stream_encoded_t *se) -{ - if (se->os_name) +void rrdpush_clean_encoded(stream_encoded_t *se) { + if (se->os_name) { freez(se->os_name); + se->os_name = NULL; + } - if (se->os_id) + if (se->os_id) { freez(se->os_id); + se->os_id = NULL; + } - if (se->os_version) + if (se->os_version) { freez(se->os_version); + se->os_version = NULL; + } - if (se->kernel_name) + if (se->kernel_name) { freez(se->kernel_name); + se->kernel_name = NULL; + } - if (se->kernel_version) + if (se->kernel_version) { freez(se->kernel_version); + se->kernel_version = NULL; + } } struct { const char *response; + const char *status; size_t length; int32_t version; bool dynamic; const char *error; int worker_job_id; int postpone_reconnect_seconds; - bool prevent_log; + ND_LOG_FIELD_PRIORITY priority; } stream_responses[] = { { .response = START_STREAMING_PROMPT_VN, .length = sizeof(START_STREAMING_PROMPT_VN) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V3, // and above .dynamic = true, // dynamic = we will parse the version / capabilities .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V2, .length = sizeof(START_STREAMING_PROMPT_V2) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V2, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V1, .length = sizeof(START_STREAMING_PROMPT_V1) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V1, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_ERROR_SAME_LOCALHOST, .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1, + .status = RRDPUSH_STATUS_LOCALHOST, .version = STREAM_HANDSHAKE_ERROR_LOCALHOST, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is its localhost", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_ALREADY_STREAMING, .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1, + .status = RRDPUSH_STATUS_ALREADY_CONNECTED, .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_NOT_PERMITTED, .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1, + .status = RRDPUSH_STATUS_PERMISSION_DENIED, .version = STREAM_HANDSHAKE_ERROR_DENIED, .dynamic = false, .error = "remote server denied access, probably we don't have the right API key?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute + .priority = NDLP_ERR, }, { .response = START_STREAMING_ERROR_BUSY_TRY_LATER, .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1, + .status = RRDPUSH_STATUS_RATE_LIMIT, .version = STREAM_HANDSHAKE_BUSY_TRY_LATER, .dynamic = false, .error = "remote server is currently busy, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes + .priority = NDLP_NOTICE, }, { .response = START_STREAMING_ERROR_INTERNAL_ERROR, .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1, + .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, .version = STREAM_HANDSHAKE_INTERNAL_ERROR, .dynamic = false, .error = "remote server is encountered an internal error, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 5 * 60, // 5 minutes + .priority = NDLP_CRIT, }, { .response = START_STREAMING_ERROR_INITIALIZATION, .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1, + .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, .version = STREAM_HANDSHAKE_INITIALIZATION, .dynamic = false, .error = "remote server is initializing, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minute + .priority = NDLP_NOTICE, }, // terminator { .response = NULL, .length = 0, + .status = RRDPUSH_STATUS_BAD_HANDSHAKE, .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, .dynamic = false, .error = "remote node response is not understood, is it Netdata?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute - .prevent_log = false, + .priority = NDLP_ERR, } }; @@ -502,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } - bool prevent_log = stream_responses[i].prevent_log; + ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority; const char *error = stream_responses[i].error; + const char *status = stream_responses[i].status; int worker_job_id = stream_responses[i].worker_job_id; int delay = stream_responses[i].postpone_reconnect_seconds; @@ -512,19 +551,29 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender host->destination->reason = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; - char buf[LOG_DATE_LENGTH]; - log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - if(prevent_log) - internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); - else - netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); + char buf[RFC3339_MAX_LENGTH]; + rfc3339_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0, false); + + nd_log(NDLS_DAEMON, priority, + "STREAM %s [send to %s]: %s - will retry in %d secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; } +unsigned char alpn_proto_list[] = { + 18, 'n', 'e', 't', 'd', 'a', 't', 'a', '_', 's', 't', 'r', 'e', 'a', 'm', '/', '2', '.', '0', + 8, 'h', 't', 't', 'p', '/', '1', '.', '1' +}; + +#define CONN_UPGRADE_VAL "upgrade" + static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #ifdef ENABLE_HTTPS RRDHOST *host = s->host; @@ -535,10 +584,16 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { if(!ssl_required) return true; - if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) { + if (netdata_ssl_open_ext(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket, alpn_proto_list, sizeof(alpn_proto_list))) { if(!netdata_ssl_connect(&host->sender->ssl)) { // couldn't connect + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR; @@ -550,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { security_test_certificate(host->sender->ssl.conn)) { // certificate is not valid + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); @@ -561,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { return true; } + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + netdata_log_error("SSL: failed to establish connection."); return false; @@ -570,6 +637,104 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #endif } +static int rrdpush_http_upgrade_prelude(RRDHOST *host, struct sender_state *s) { + + char http[HTTP_HEADER_SIZE + 1]; + snprintfz(http, HTTP_HEADER_SIZE, + "GET " NETDATA_STREAM_URL HTTP_1_1 HTTP_ENDL + "Upgrade: " NETDATA_STREAM_PROTO_NAME HTTP_ENDL + "Connection: Upgrade" + HTTP_HDR_END); + + ssize_t bytes = send_timeout( +#ifdef ENABLE_HTTPS + &host->sender->ssl, +#endif + s->rrdpush_sender_socket, + http, + strlen(http), + 0, + 1000); + + bytes = recv_timeout( +#ifdef ENABLE_HTTPS + &host->sender->ssl, +#endif + s->rrdpush_sender_socket, + http, + HTTP_HEADER_SIZE, + 0, + 1000); + + if (bytes <= 0) { + error_report("Error reading from remote"); + return 1; + } + + rbuf_t buf = rbuf_create(bytes); + rbuf_push(buf, http, bytes); + + http_parse_ctx ctx; + http_parse_ctx_create(&ctx); + ctx.flags |= HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT; + + int rc; +// while((rc = parse_http_response(buf, &ctx)) == HTTP_PARSE_NEED_MORE_DATA); + rc = parse_http_response(buf, &ctx); + + if (rc != HTTP_PARSE_SUCCESS) { + error_report("Failed to parse HTTP response sent. (%d)", rc); + goto err_cleanup; + } + if (ctx.http_code == HTTP_RESP_MOVED_PERM) { + const char *hdr = get_http_header_by_name(&ctx, "location"); + if (hdr) + error_report("HTTP response is %d Moved Permanently (location: \"%s\") instead of expected %d Switching Protocols.", ctx.http_code, hdr, HTTP_RESP_SWITCH_PROTO); + else + error_report("HTTP response is %d instead of expected %d Switching Protocols.", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + goto err_cleanup; + } + if (ctx.http_code == HTTP_RESP_NOT_FOUND) { + error_report("HTTP response is %d instead of expected %d Switching Protocols. Parent version too old.", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + // TODO set some flag here that will signify parent is older version + // and to try connection without rrdpush_http_upgrade_prelude next time + goto err_cleanup; + } + if (ctx.http_code != HTTP_RESP_SWITCH_PROTO) { + error_report("HTTP response is %d instead of expected %d Switching Protocols", ctx.http_code, HTTP_RESP_SWITCH_PROTO); + goto err_cleanup; + } + + const char *hdr = get_http_header_by_name(&ctx, "connection"); + if (!hdr) { + error_report("Missing \"connection\" header in reply"); + goto err_cleanup; + } + if (strncmp(hdr, CONN_UPGRADE_VAL, strlen(CONN_UPGRADE_VAL))) { + error_report("Expected \"connection: " CONN_UPGRADE_VAL "\""); + goto err_cleanup; + } + + hdr = get_http_header_by_name(&ctx, "upgrade"); + if (!hdr) { + error_report("Missing \"upgrade\" header in reply"); + goto err_cleanup; + } + if (strncmp(hdr, NETDATA_STREAM_PROTO_NAME, strlen(NETDATA_STREAM_PROTO_NAME))) { + error_report("Expected \"upgrade: " NETDATA_STREAM_PROTO_NAME "\""); + goto err_cleanup; + } + + netdata_log_debug(D_STREAM, "Stream sender upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful"); + rbuf_free(buf); + http_parse_ctx_destroy(&ctx); + return 0; +err_cleanup: + rbuf_free(buf); + http_parse_ctx_destroy(&ctx); + return 1; +} + static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { struct timeval tv = { @@ -600,12 +765,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p // reset our capabilities to default s->capabilities = stream_our_capabilities(host, true); -#ifdef ENABLE_RRDPUSH_COMPRESSION - // If we don't want compression, remove it from our capabilities - if(!(s->flags & SENDER_FLAG_COMPRESSION)) - s->capabilities &= ~STREAM_CAP_COMPRESSION; -#endif // ENABLE_RRDPUSH_COMPRESSION - /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. */ @@ -660,7 +819,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p "&NETDATA_SYSTEM_TOTAL_RAM=%s" "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s" "&NETDATA_PROTOCOL_VERSION=%s" - " HTTP/1.1\r\n" + HTTP_1_1 HTTP_ENDL "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" , host->rrdpush_send_api_key @@ -715,6 +874,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(!rrdpush_sender_connect_ssl(s)) return false; + if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION); + rrdpush_sender_thread_close_socket(host); + host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE; + host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; + return false; + } + ssize_t bytes, len = (ssize_t)strlen(http); bytes = send_timeout( @@ -728,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; return false; @@ -747,41 +930,62 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: remote netdata does not respond.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 30; return false; } if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot set non-blocking mode for socket.", + rrdhost_hostname(host), s->connected_to); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot enlarge the socket buffer.", + rrdhost_hostname(host), s->connected_to); http[bytes] = '\0'; - netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) - rrdpush_compressor_reset(&s->compressor); - else - rrdpush_compressor_destroy(&s->compressor); -#endif // ENABLE_RRDPUSH_COMPRESSION + rrdpush_compression_initialize(s); log_sender_capabilities(s); - netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s: connected to %s...", + rrdhost_hostname(host), s->connected_to); return true; } -static bool attempt_to_connect(struct sender_state *state) -{ +static bool attempt_to_connect(struct sender_state *state) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + state->send_attempts = 0; // reset the bytes we have sent for this session @@ -950,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { void execute_commands(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_EXECUTE); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; while( start < end && (newline = strchr(start, '\n')) ) { @@ -963,27 +1173,22 @@ void execute_commands(struct sender_state *s) { continue; } - netdata_log_access("STREAM: %d from '%s' for host '%s': %s", - gettid(), s->connected_to, rrdhost_hostname(s->host), start); - - // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); - - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); + s->line.count++; + s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(s->line.words, s->line.num_words, 0); - if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { + if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); - char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); - char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1021,9 +1226,12 @@ void execute_commands(struct sender_state *s) { memset(&s->function_payload, 0, sizeof(struct function_payload_state)); } } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); + if (s->receiving_function_payload) { - netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", + rrdhost_hostname(s->host), s->connected_to, command); s->receiving_function_payload = false; buffer_free(s->function_payload.payload); s->function_payload.payload = NULL; @@ -1031,14 +1239,14 @@ void execute_commands(struct sender_state *s) { // TODO send error response } - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1047,30 +1255,32 @@ void execute_commands(struct sender_state *s) { s->receiving_function_payload = true; s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); - s->function_payload.txid = strdupz(get_word(words, num_words, 1)); - s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); - s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); + s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1)); + s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3)); } - else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - char *transaction = get_word(words, num_words, 1); + char *transaction = get_word(s->line.words, s->line.num_words, 1); if(transaction && *transaction) rrd_function_cancel(transaction); } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - const char *chart_id = get_word(words, num_words, 1); - const char *start_streaming = get_word(words, num_words, 2); - const char *after = get_word(words, num_words, 3); - const char *before = get_word(words, num_words, 4); + const char *chart_id = get_word(s->line.words, s->line.num_words, 1); + const char *start_streaming = get_word(s->line.words, s->line.num_words, 2); + const char *after = get_word(s->line.words, s->line.num_words, 3); + const char *before = get_word(s->line.words, s->line.num_words, 4); if (!chart_id || !start_streaming || !after || !before) { netdata_log_error("STREAM %s [send to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, chart_id ? chart_id : "(unset)", start_streaming ? start_streaming : "(unset)", after ? after : "(unset)", @@ -1085,12 +1295,14 @@ void execute_commands(struct sender_state *s) { } } else { - netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)"); } + line_splitter_reset(&s->line); worker_is_busy(WORKER_SENDER_JOB_EXECUTE); start = newline + 1; } + if (start < end) { memmove(s->read_buffer, start, end-start); s->read_len = end - start; @@ -1245,6 +1457,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); + +#ifdef NETDATA_LOG_STREAM_SENDER + if(host->sender->stream_log_fp) { + fclose(host->sender->stream_log_fp); + host->sender->stream_log_fp = NULL; + } +#endif + sender_unlock(host->sender); freez(s->pipe_buffer); @@ -1277,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) { #endif } +static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + + stream_capabilities_to_string(wb, state->capabilities); + return true; +} + +static bool stream_sender_log_transport(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + +static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_strcat(wb, peers.peer.ip); + return true; +} + +static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_print_uint64(wb, peers.peer.port); + return true; +} + void *rrdpush_sender_thread(void *ptr) { + struct sender_state *s = ptr; + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname), + ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s), + ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s), + ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_register("STREAMSND"); worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read"); @@ -1296,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade"); worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request"); worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function"); @@ -1303,10 +1578,11 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); - struct sender_state *s = ptr; - if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { @@ -1342,6 +1618,9 @@ void *rrdpush_sender_thread(void *ptr) { "initial clock resync iterations", remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING + s->parent_using_h2o = appconfig_get_boolean( + &stream_config, CONFIG_SECTION_STREAM, "parent using h2o", false); + // initialize rrdpush globals rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); @@ -1397,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) { s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s [send to %s]: enabling metrics streaming...", + rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1423,6 +1705,15 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } + + if(s->compressor.initialized) { + size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed; + size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t); + NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio); + } sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); @@ -1459,7 +1750,7 @@ void *rrdpush_sender_thread(void *ptr) { } }; - int poll_rc = poll(fds, 2, 1000); + int poll_rc = poll(fds, 2, 50); // timeout in milliseconds netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); |