summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c581
1 files changed, 436 insertions, 145 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 71f87503..09b67e96 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,31 +1,37 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-
-#define WORKER_SENDER_JOB_CONNECT 0
-#define WORKER_SENDER_JOB_PIPE_READ 1
-#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
-#define WORKER_SENDER_JOB_EXECUTE 3
-#define WORKER_SENDER_JOB_SOCKET_SEND 4
-#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
-#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
-#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
-#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
-#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
-#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
-#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
-#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
-#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
-#define WORKER_SENDER_JOB_BUFFER_RATIO 15
-#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
-#define WORKER_SENDER_JOB_BYTES_SENT 17
-#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
-#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
-#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
+#include "common.h"
+#include "aclk/https_client.h"
+
+#define WORKER_SENDER_JOB_CONNECT 0
+#define WORKER_SENDER_JOB_PIPE_READ 1
+#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
+#define WORKER_SENDER_JOB_EXECUTE 3
+#define WORKER_SENDER_JOB_SOCKET_SEND 4
+#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
+#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
+#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
+#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
+#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
+#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
+#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
+#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
+#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
+#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 21
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
+#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 25
#endif
extern struct config stream_config;
@@ -66,21 +72,6 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-/*
-* In case of stream compression buffer overflow
-* Inform the user through the error log file and
-* deactivate compression by downgrading the stream protocol.
-*/
-static inline void deactivate_compression(struct sender_state *s) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
- s->flags &= ~SENDER_FLAG_COMPRESSION;
- netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
- rrdpush_sender_thread_close_socket(s->host);
-}
-#endif
-
#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
// Collector thread finishing a transmission
@@ -102,13 +93,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
sender_lock(s);
-// FILE *fp = fopen("/tmp/stream.txt", "a");
-// fprintf(fp,
-// "\n--- SEND BEGIN: %s ----\n"
-// "%s"
-// "--- SEND END ----------------------------------------\n"
-// , rrdhost_hostname(s->host), src);
-// fclose(fp);
+#ifdef NETDATA_LOG_STREAM_SENDER
+ if(type == STREAM_TRAFFIC_TYPE_METADATA) {
+ if(!s->stream_log_fp) {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown");
+
+ s->stream_log_fp = fopen(filename, "w");
+ }
+
+ fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n"
+ "%s"
+ "--- SEND MESSAGE END ----------------------------------------\n"
+ , rrdhost_hostname(s->host), src
+ );
+ }
+#endif
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
@@ -117,8 +117,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
+ if (s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -143,28 +142,45 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
}
- char *dst;
+ const char *dst;
size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
- rrdpush_compressor_reset(&s->compressor);
+ rrdpush_compression_initialize(s);
dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
- deactivate_compression(s);
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
+ rrdpush_compression_deactivate(s);
+ rrdpush_sender_thread_close_socket(s->host);
sender_unlock(s);
return;
}
}
- if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
+ rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // check if reversing the signature provides the same length
+ size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature));
+ if(decoded_dst_len != dst_len)
+ fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, "
+ "compressed payload length %zu bytes, but signature says payload is %zu bytes",
+ size_to_compress, dst_len, decoded_dst_len);
+#endif
+
+ if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature)))
s->flags |= SENDER_FLAG_OVERFLOW;
- else
- s->sent_bytes_on_this_connection_per_type[type] += dst_len;
+ else {
+ if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature);
+ }
src = src + size_to_compress;
src_len -= size_to_compress;
@@ -174,12 +190,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += src_len;
-#else
- if(cbuffer_add_unsafe(s->buffer, src, src_len))
- s->flags |= SENDER_FLAG_OVERFLOW;
- else
- s->sent_bytes_on_this_connection_per_type[type] += src_len;
-#endif
replication_recalculate_buffer_used_ratio_unsafe(s);
@@ -191,7 +201,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
sender_unlock(s);
- if(signal_sender)
+ if(signal_sender && (!stream_has_capability(s, STREAM_CAP_INTERPOLATED) || type != STREAM_TRAFFIC_TYPE_DATA))
rrdpush_signal_sender_to_wake_up(s);
}
@@ -251,15 +261,17 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- st->upstream_resync_time_s = 0;
+ st->rrdpush.sender.resync_time_s = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rrddim_clear_exposed(rd);
+ rrddim_metadata_exposed_upstream_clear(rd);
rrddim_foreach_done(rd);
+
+ rrdset_metadata_updated(st);
}
rrdset_foreach_done(st);
@@ -342,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
rrdpush_sender_charts_and_replication_reset(host);
}
-void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
-{
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) {
se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
@@ -351,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
}
-void rrdpush_clean_encoded(stream_encoded_t *se)
-{
- if (se->os_name)
+void rrdpush_clean_encoded(stream_encoded_t *se) {
+ if (se->os_name) {
freez(se->os_name);
+ se->os_name = NULL;
+ }
- if (se->os_id)
+ if (se->os_id) {
freez(se->os_id);
+ se->os_id = NULL;
+ }
- if (se->os_version)
+ if (se->os_version) {
freez(se->os_version);
+ se->os_version = NULL;
+ }
- if (se->kernel_name)
+ if (se->kernel_name) {
freez(se->kernel_name);
+ se->kernel_name = NULL;
+ }
- if (se->kernel_version)
+ if (se->kernel_version) {
freez(se->kernel_version);
+ se->kernel_version = NULL;
+ }
}
struct {
const char *response;
+ const char *status;
size_t length;
int32_t version;
bool dynamic;
const char *error;
int worker_job_id;
int postpone_reconnect_seconds;
- bool prevent_log;
+ ND_LOG_FIELD_PRIORITY priority;
} stream_responses[] = {
{
.response = START_STREAMING_PROMPT_VN,
.length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V3, // and above
.dynamic = true, // dynamic = we will parse the version / capabilities
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V2,
.length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V2,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V1,
.length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V1,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_ERROR_SAME_LOCALHOST,
.length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .status = RRDPUSH_STATUS_LOCALHOST,
.version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is its localhost",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_ALREADY_STREAMING,
.length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .status = RRDPUSH_STATUS_ALREADY_CONNECTED,
.version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
.length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .status = RRDPUSH_STATUS_PERMISSION_DENIED,
.version = STREAM_HANDSHAKE_ERROR_DENIED,
.dynamic = false,
.error = "remote server denied access, probably we don't have the right API key?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .priority = NDLP_ERR,
},
{
.response = START_STREAMING_ERROR_BUSY_TRY_LATER,
.length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
+ .status = RRDPUSH_STATUS_RATE_LIMIT,
.version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
.dynamic = false,
.error = "remote server is currently busy, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ .priority = NDLP_NOTICE,
},
{
.response = START_STREAMING_ERROR_INTERNAL_ERROR,
.length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
+ .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR,
.version = STREAM_HANDSHAKE_INTERNAL_ERROR,
.dynamic = false,
.error = "remote server is encountered an internal error, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 5 * 60, // 5 minutes
+ .priority = NDLP_CRIT,
},
{
.response = START_STREAMING_ERROR_INITIALIZATION,
.length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
+ .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS,
.version = STREAM_HANDSHAKE_INITIALIZATION,
.dynamic = false,
.error = "remote server is initializing, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minute
+ .priority = NDLP_NOTICE,
},
// terminator
{
.response = NULL,
.length = 0,
+ .status = RRDPUSH_STATUS_BAD_HANDSHAKE,
.version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
.dynamic = false,
.error = "remote node response is not understood, is it Netdata?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
- .prevent_log = false,
+ .priority = NDLP_ERR,
}
};
@@ -502,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
- bool prevent_log = stream_responses[i].prevent_log;
+ ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority;
const char *error = stream_responses[i].error;
+ const char *status = stream_responses[i].status;
int worker_job_id = stream_responses[i].worker_job_id;
int delay = stream_responses[i].postpone_reconnect_seconds;
@@ -512,19 +551,29 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
- char buf[LOG_DATE_LENGTH];
- log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(prevent_log)
- internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
- else
- netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
+ char buf[RFC3339_MAX_LENGTH];
+ rfc3339_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0, false);
+
+ nd_log(NDLS_DAEMON, priority,
+ "STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
}
+unsigned char alpn_proto_list[] = {
+ 18, 'n', 'e', 't', 'd', 'a', 't', 'a', '_', 's', 't', 'r', 'e', 'a', 'm', '/', '2', '.', '0',
+ 8, 'h', 't', 't', 'p', '/', '1', '.', '1'
+};
+
+#define CONN_UPGRADE_VAL "upgrade"
+
static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#ifdef ENABLE_HTTPS
RRDHOST *host = s->host;
@@ -535,10 +584,16 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
if(!ssl_required)
return true;
- if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
+ if (netdata_ssl_open_ext(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket, alpn_proto_list, sizeof(alpn_proto_list))) {
if(!netdata_ssl_connect(&host->sender->ssl)) {
// couldn't connect
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
@@ -550,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
security_test_certificate(host->sender->ssl.conn)) {
// certificate is not valid
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
@@ -561,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
return true;
}
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
netdata_log_error("SSL: failed to establish connection.");
return false;
@@ -570,6 +637,104 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#endif
}
+static int rrdpush_http_upgrade_prelude(RRDHOST *host, struct sender_state *s) {
+
+ char http[HTTP_HEADER_SIZE + 1];
+ snprintfz(http, HTTP_HEADER_SIZE,
+ "GET " NETDATA_STREAM_URL HTTP_1_1 HTTP_ENDL
+ "Upgrade: " NETDATA_STREAM_PROTO_NAME HTTP_ENDL
+ "Connection: Upgrade"
+ HTTP_HDR_END);
+
+ ssize_t bytes = send_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ strlen(http),
+ 0,
+ 1000);
+
+ bytes = recv_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ HTTP_HEADER_SIZE,
+ 0,
+ 1000);
+
+ if (bytes <= 0) {
+ error_report("Error reading from remote");
+ return 1;
+ }
+
+ rbuf_t buf = rbuf_create(bytes);
+ rbuf_push(buf, http, bytes);
+
+ http_parse_ctx ctx;
+ http_parse_ctx_create(&ctx);
+ ctx.flags |= HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT;
+
+ int rc;
+// while((rc = parse_http_response(buf, &ctx)) == HTTP_PARSE_NEED_MORE_DATA);
+ rc = parse_http_response(buf, &ctx);
+
+ if (rc != HTTP_PARSE_SUCCESS) {
+ error_report("Failed to parse HTTP response sent. (%d)", rc);
+ goto err_cleanup;
+ }
+ if (ctx.http_code == HTTP_RESP_MOVED_PERM) {
+ const char *hdr = get_http_header_by_name(&ctx, "location");
+ if (hdr)
+ error_report("HTTP response is %d Moved Permanently (location: \"%s\") instead of expected %d Switching Protocols.", ctx.http_code, hdr, HTTP_RESP_SWITCH_PROTO);
+ else
+ error_report("HTTP response is %d instead of expected %d Switching Protocols.", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ goto err_cleanup;
+ }
+ if (ctx.http_code == HTTP_RESP_NOT_FOUND) {
+ error_report("HTTP response is %d instead of expected %d Switching Protocols. Parent version too old.", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ // TODO set some flag here that will signify parent is older version
+ // and to try connection without rrdpush_http_upgrade_prelude next time
+ goto err_cleanup;
+ }
+ if (ctx.http_code != HTTP_RESP_SWITCH_PROTO) {
+ error_report("HTTP response is %d instead of expected %d Switching Protocols", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ goto err_cleanup;
+ }
+
+ const char *hdr = get_http_header_by_name(&ctx, "connection");
+ if (!hdr) {
+ error_report("Missing \"connection\" header in reply");
+ goto err_cleanup;
+ }
+ if (strncmp(hdr, CONN_UPGRADE_VAL, strlen(CONN_UPGRADE_VAL))) {
+ error_report("Expected \"connection: " CONN_UPGRADE_VAL "\"");
+ goto err_cleanup;
+ }
+
+ hdr = get_http_header_by_name(&ctx, "upgrade");
+ if (!hdr) {
+ error_report("Missing \"upgrade\" header in reply");
+ goto err_cleanup;
+ }
+ if (strncmp(hdr, NETDATA_STREAM_PROTO_NAME, strlen(NETDATA_STREAM_PROTO_NAME))) {
+ error_report("Expected \"upgrade: " NETDATA_STREAM_PROTO_NAME "\"");
+ goto err_cleanup;
+ }
+
+ netdata_log_debug(D_STREAM, "Stream sender upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful");
+ rbuf_free(buf);
+ http_parse_ctx_destroy(&ctx);
+ return 0;
+err_cleanup:
+ rbuf_free(buf);
+ http_parse_ctx_destroy(&ctx);
+ return 1;
+}
+
static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
@@ -600,12 +765,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// reset our capabilities to default
s->capabilities = stream_our_capabilities(host, true);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- // If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION))
- s->capabilities &= ~STREAM_CAP_COMPRESSION;
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@@ -660,7 +819,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
"&NETDATA_SYSTEM_TOTAL_RAM=%s"
"&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
"&NETDATA_PROTOCOL_VERSION=%s"
- " HTTP/1.1\r\n"
+ HTTP_1_1 HTTP_ENDL
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_send_api_key
@@ -715,6 +874,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_connect_ssl(s))
return false;
+ if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
+ return false;
+ }
+
ssize_t bytes, len = (ssize_t)strlen(http);
bytes = send_timeout(
@@ -728,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
@@ -747,41 +930,62 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: remote netdata does not respond.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot set non-blocking mode for socket.",
+ rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot enlarge the socket buffer.",
+ rrdhost_hostname(host), s->connected_to);
http[bytes] = '\0';
- netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
- rrdpush_compressor_reset(&s->compressor);
- else
- rrdpush_compressor_destroy(&s->compressor);
-#endif // ENABLE_RRDPUSH_COMPRESSION
+ rrdpush_compression_initialize(s);
log_sender_capabilities(s);
- netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s: connected to %s...",
+ rrdhost_hostname(host), s->connected_to);
return true;
}
-static bool attempt_to_connect(struct sender_state *state)
-{
+static bool attempt_to_connect(struct sender_state *state) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
state->send_attempts = 0;
// reset the bytes we have sent for this session
@@ -950,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
void execute_commands(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
@@ -963,27 +1173,22 @@ void execute_commands(struct sender_state *s) {
continue;
}
- netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
- gettid(), s->connected_to, rrdhost_hostname(s->host), start);
-
- // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
-
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
-
- const char *keyword = get_word(words, num_words, 0);
+ s->line.count++;
+ s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(s->line.words, s->line.num_words, 0);
- if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
+ if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
- char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
- char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
- char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1021,9 +1226,12 @@ void execute_commands(struct sender_state *s) {
memset(&s->function_payload, 0, sizeof(struct function_payload_state));
}
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
+
if (s->receiving_function_payload) {
- netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload",
+ rrdhost_hostname(s->host), s->connected_to, command);
s->receiving_function_payload = false;
buffer_free(s->function_payload.payload);
s->function_payload.payload = NULL;
@@ -1031,14 +1239,14 @@ void execute_commands(struct sender_state *s) {
// TODO send error response
}
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1047,30 +1255,32 @@ void execute_commands(struct sender_state *s) {
s->receiving_function_payload = true;
s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
- s->function_payload.txid = strdupz(get_word(words, num_words, 1));
- s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
- s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
+ s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3));
}
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- char *transaction = get_word(words, num_words, 1);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
if(transaction && *transaction)
rrd_function_cancel(transaction);
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- const char *chart_id = get_word(words, num_words, 1);
- const char *start_streaming = get_word(words, num_words, 2);
- const char *after = get_word(words, num_words, 3);
- const char *before = get_word(words, num_words, 4);
+ const char *chart_id = get_word(s->line.words, s->line.num_words, 1);
+ const char *start_streaming = get_word(s->line.words, s->line.num_words, 2);
+ const char *after = get_word(s->line.words, s->line.num_words, 3);
+ const char *before = get_word(s->line.words, s->line.num_words, 4);
if (!chart_id || !start_streaming || !after || !before) {
netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
" (chart=%s, start_streaming=%s, after=%s, before=%s)",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
chart_id ? chart_id : "(unset)",
start_streaming ? start_streaming : "(unset)",
after ? after : "(unset)",
@@ -1085,12 +1295,14 @@ void execute_commands(struct sender_state *s) {
}
}
else {
- netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)");
}
+ line_splitter_reset(&s->line);
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
start = newline + 1;
}
+
if (start < end) {
memmove(s->read_buffer, start, end-start);
s->read_len = end - start;
@@ -1245,6 +1457,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+ if(host->sender->stream_log_fp) {
+ fclose(host->sender->stream_log_fp);
+ host->sender->stream_log_fp = NULL;
+ }
+#endif
+
sender_unlock(host->sender);
freez(s->pipe_buffer);
@@ -1277,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
#endif
}
+static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+ stream_capabilities_to_string(wb, state->capabilities);
+ return true;
+}
+
+static bool stream_sender_log_transport(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
+static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_strcat(wb, peers.peer.ip);
+ return true;
+}
+
+static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_print_uint64(wb, peers.peer.port);
+ return true;
+}
+
void *rrdpush_sender_thread(void *ptr) {
+ struct sender_state *s = ptr;
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname),
+ ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s),
+ ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s),
+ ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_register("STREAMSND");
worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
@@ -1296,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade");
worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
@@ -1303,10 +1578,11 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
- struct sender_state *s = ptr;
-
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
@@ -1342,6 +1618,9 @@ void *rrdpush_sender_thread(void *ptr) {
"initial clock resync iterations",
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
+ s->parent_using_h2o = appconfig_get_boolean(
+ &stream_config, CONFIG_SECTION_STREAM, "parent using h2o", false);
+
// initialize rrdpush globals
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
@@ -1397,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) {
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s [send to %s]: enabling metrics streaming...",
+ rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1423,6 +1705,15 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
+
+ if(s->compressor.initialized) {
+ size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed;
+ size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t);
+ NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio);
+ }
sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
@@ -1459,7 +1750,7 @@ void *rrdpush_sender_thread(void *ptr) {
}
};
- int poll_rc = poll(fds, 2, 1000);
+ int poll_rc = poll(fds, 2, 50); // timeout in milliseconds
netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);